git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1030490 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2010-11-03 15:06:21 +00:00
parent 66a945a8f1
commit 4fc1712cdc
9 changed files with 408 additions and 30 deletions

View File

@ -179,21 +179,29 @@ public class ManagedRegionBroker extends RegionBroker {
String connectionClientId = context.getClientId();
ObjectName brokerJmxObjectName = brokerObjectName;
String objectNameStr = getSubscriptionObjectName(sub, connectionClientId, brokerJmxObjectName);
SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName());
try {
ObjectName objectName = new ObjectName(objectNameStr);
SubscriptionView view;
if (sub.getConsumerInfo().isDurable()) {
view = new DurableSubscriptionView(this, context.getClientId(), sub);
if (sub.getConsumerInfo().getConsumerId().getConnectionId().equals("OFFLINE")) {
// add offline subscribers to inactive list
SubscriptionInfo info = new SubscriptionInfo();
info.setClientId(context.getClientId());
info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName());
info.setDestination(sub.getConsumerInfo().getDestination());
addInactiveSubscription(key, info);
} else {
if (sub instanceof TopicSubscription) {
view = new TopicSubscriptionView(context.getClientId(), (TopicSubscription)sub);
if (sub.getConsumerInfo().isDurable()) {
view = new DurableSubscriptionView(this, context.getClientId(), sub);
} else {
view = new SubscriptionView(context.getClientId(), sub);
if (sub instanceof TopicSubscription) {
view = new TopicSubscriptionView(context.getClientId(), (TopicSubscription) sub);
} else {
view = new SubscriptionView(context.getClientId(), sub);
}
}
registerSubscription(objectName, sub.getConsumerInfo(), key, view);
}
registerSubscription(objectName, sub.getConsumerInfo(), key, view);
subscriptionMap.put(sub, objectName);
return objectName;
} catch (Exception e) {
@ -227,11 +235,38 @@ public class ManagedRegionBroker extends RegionBroker {
return objectNameStr;
}
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
Subscription sub = super.addConsumer(context, info);
SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
if (inactiveName != null) {
// if it was inactive, register it
registerSubscription(context, sub);
}
return sub;
}
@Override
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
for (Subscription sub : subscriptionMap.keySet()) {
if (sub.getConsumerInfo().equals(info)) {
// unregister all consumer subs
unregisterSubscription(subscriptionMap.get(sub), true);
}
}
super.removeConsumer(context, info);
}
public void unregisterSubscription(Subscription sub) {
ObjectName name = subscriptionMap.remove(sub);
if (name != null) {
try {
unregisterSubscription(name);
SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
if (inactiveName != null) {
inactiveDurableTopicSubscribers.remove(inactiveName);
}
} catch (Exception e) {
LOG.error("Failed to unregister subscription " + sub, e);
}
@ -337,10 +372,9 @@ public class ManagedRegionBroker extends RegionBroker {
}
protected void unregisterSubscription(ObjectName key) throws Exception {
protected void unregisterSubscription(ObjectName key, boolean addToInactive) throws Exception {
queueSubscribers.remove(key);
topicSubscribers.remove(key);
inactiveDurableTopicSubscribers.remove(key);
temporaryQueueSubscribers.remove(key);
temporaryTopicSubscribers.remove(key);
if (registeredMBeans.remove(key)) {
@ -355,11 +389,13 @@ public class ManagedRegionBroker extends RegionBroker {
if (view != null) {
// need to put this back in the inactive list
SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(), view.getSubscriptionName());
SubscriptionInfo info = new SubscriptionInfo();
info.setClientId(subscriptionKey.getClientId());
info.setSubscriptionName(subscriptionKey.getSubscriptionName());
info.setDestination(new ActiveMQTopic(view.getDestinationName()));
addInactiveSubscription(subscriptionKey, info);
if (addToInactive) {
SubscriptionInfo info = new SubscriptionInfo();
info.setClientId(subscriptionKey.getClientId());
info.setSubscriptionName(subscriptionKey.getSubscriptionName());
info.setDestination(new ActiveMQTopic(view.getDestinationName()));
addInactiveSubscription(subscriptionKey, info);
}
}
}

View File

@ -138,7 +138,6 @@ public class TopicRegion extends AbstractRegion {
throw new JMSException("Durable consumer is in use");
}
durableSubscriptions.remove(key);
synchronized (destinationsMutex) {
for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
Destination dest = iter.next();
@ -149,7 +148,12 @@ public class TopicRegion extends AbstractRegion {
}
}
}
super.removeConsumer(context, sub.getConsumerInfo());
if (subscriptions.get(sub.getConsumerInfo()) != null) {
super.removeConsumer(context, sub.getConsumerInfo());
} else {
// try destroying inactive subscriptions
destroySubscription(sub);
}
}
@Override
@ -159,7 +163,6 @@ public class TopicRegion extends AbstractRegion {
@Override
protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception {
List<Subscription> rc = super.addSubscriptionsForDestination(context, dest);
Set<Subscription> dupChecker = new HashSet<Subscription>(rc);
@ -210,7 +213,6 @@ public class TopicRegion extends AbstractRegion {
}
}
}
return rc;
}
@ -250,6 +252,7 @@ public class TopicRegion extends AbstractRegion {
if (sub == null) {
sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive);
if (destination != null && broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
if (entry != null) {

View File

@ -442,7 +442,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
String selector = null;
ObjectName name1 = broker.createDurableSubscriber(clientID, "subscriber1", topicName, selector);
broker.createDurableSubscriber(clientID, "subscriber2", topicName, selector);
assertEquals("Durable subscriber count", 2, broker.getDurableTopicSubscribers().length);
assertEquals("Durable subscriber count", 2, broker.getInactiveDurableTopicSubscribers().length);
assertNotNull("Should have created an mbean name for the durable subscriber!", name1);
@ -450,7 +450,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
// now lets try destroy it
broker.destroyDurableSubscriber(clientID, "subscriber1");
assertEquals("Durable subscriber count", 1, broker.getDurableTopicSubscribers().length);
assertEquals("Durable subscriber count", 1, broker.getInactiveDurableTopicSubscribers().length);
}
protected void assertConsumerCounts() throws Exception {

View File

@ -0,0 +1,276 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import javax.jms.Connection;
import javax.jms.Session;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.io.File;
import java.lang.management.ManagementFactory;
public class DurableSubscriptionUnsubscribeTest extends TestSupport {
BrokerService broker = null;
Connection connection = null;
ActiveMQTopic topic;
public void testJMXSubscriptionUnsubscribe() throws Exception {
doJMXUnsubscribe(false);
}
public void testJMXSubscriptionUnsubscribeWithRestart() throws Exception {
doJMXUnsubscribe(true);
}
public void testConnectionSubscriptionUnsubscribe() throws Exception {
doConnectionUnsubscribe(false);
}
public void testConnectionSubscriptionUnsubscribeWithRestart() throws Exception {
doConnectionUnsubscribe(true);
}
public void testDirectSubscriptionUnsubscribe() throws Exception {
doDirectUnsubscribe(false);
}
public void testDirectubscriptionUnsubscribeWithRestart() throws Exception {
doDirectUnsubscribe(true);
}
public void doJMXUnsubscribe(boolean restart) throws Exception {
for (int i = 0; i < 100; i++) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId" + i);
session.close();
}
Thread.sleep(2 * 1000);
if (restart) {
stopBroker();
startBroker(false);
}
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName[] subscriptions = broker.getAdminView().getDurableTopicSubscribers();
ObjectName[] inactive = broker.getAdminView().getInactiveDurableTopicSubscribers();
for (ObjectName subscription: subscriptions) {
mbs.invoke(subscription, "destroy", null, null);
}
for (ObjectName subscription: inactive) {
mbs.invoke(subscription, "destroy", null, null);
}
Thread.sleep(2 * 1000);
subscriptions = broker.getAdminView().getDurableTopicSubscribers();
assertEquals(0, subscriptions.length);
subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
assertEquals(0, subscriptions.length);
}
public void testInactiveSubscriptions() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId");
ObjectName[] subscriptions = broker.getAdminView().getDurableTopicSubscribers();
assertEquals(1, subscriptions.length);
subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
assertEquals(0, subscriptions.length);
session.close();
Thread.sleep(1000);
subscriptions = broker.getAdminView().getDurableTopicSubscribers();
assertEquals(0, subscriptions.length);
subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
assertEquals(1, subscriptions.length);
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId");
Thread.sleep(1000);
subscriptions = broker.getAdminView().getDurableTopicSubscribers();
assertEquals(1, subscriptions.length);
subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
assertEquals(0, subscriptions.length);
session.close();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Thread.sleep(1000);
subscriptions = broker.getAdminView().getDurableTopicSubscribers();
assertEquals(0, subscriptions.length);
subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
assertEquals(1, subscriptions.length);
session.unsubscribe("SubsId");
Thread.sleep(1000);
subscriptions = broker.getAdminView().getDurableTopicSubscribers();
assertEquals(0, subscriptions.length);
subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
assertEquals(0, subscriptions.length);
session.close();
}
public void doConnectionUnsubscribe(boolean restart) throws Exception {
for (int i = 0; i < 100; i++) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId" + i);
session.close();
}
Thread.sleep(2 * 1000);
if (restart) {
stopBroker();
startBroker(false);
}
ObjectName[] subscriptions = broker.getAdminView().getDurableTopicSubscribers();
assertEquals(0, subscriptions.length);
subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
assertEquals(100, subscriptions.length);
for (int i = 0; i < 100; i++) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.unsubscribe("SubsId" + i);
session.close();
}
Thread.sleep(2 * 1000);
subscriptions = broker.getAdminView().getDurableTopicSubscribers();
assertEquals(0, subscriptions.length);
subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
assertEquals(0, subscriptions.length);
}
public void doDirectUnsubscribe(boolean restart) throws Exception {
for (int i = 0; i < 100; i++) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId" + i);
session.close();
}
Thread.sleep(2 * 1000);
if (restart) {
stopBroker();
startBroker(false);
}
for (int i = 0; i < 100; i++) {
RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
info.setClientId(getName());
info.setSubscriptionName("SubsId" + i);
ConnectionContext context = new ConnectionContext();
context.setBroker(broker.getRegionBroker());
context.setClientId(getName());
broker.getRegionBroker().removeSubscription(context, info);
}
Thread.sleep(2 * 1000);
ObjectName[] subscriptions = broker.getAdminView().getDurableTopicSubscribers();
assertEquals(0, subscriptions.length);
subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
assertEquals(0, subscriptions.length);
}
private void startBroker(boolean deleteMessages) throws Exception {
broker = BrokerFactory.createBroker("broker:(vm://localhost)");
broker.setUseJmx(true);
broker.setBrokerName(getName());
broker.setPersistent(true);
KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
persistenceAdapter.setDirectory(new File("activemq-data/" + getName()));
broker.setPersistenceAdapter(persistenceAdapter);
if (deleteMessages) {
broker.setDeleteAllMessagesOnStartup(true);
}
broker.start();
connection = createConnection();
}
private void stopBroker() throws Exception {
if (connection != null)
connection.close();
connection = null;
if (broker != null)
broker.stop();
broker = null;
}
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory("vm://" + getName() + "?waitForStart=5000&create=false");
}
@Override
protected void setUp() throws Exception {
super.setUp();
topic = (ActiveMQTopic) createDestination();
startBroker(true);
}
@Override
protected void tearDown() throws Exception {
stopBroker();
super.tearDown();
}
@Override
protected Connection createConnection() throws Exception {
Connection rc = super.createConnection();
rc.setClientID(getName());
rc.start();
return rc;
}
}

View File

@ -63,7 +63,7 @@ public class DurableUnsubscribeTest extends org.apache.activemq.TestSupport {
assertEquals("Subscription is missing.", 1, d.getConsumers().size());
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName[] subNames = broker.getAdminView().getDurableTopicSubscribers();
ObjectName[] subNames = broker.getAdminView().getInactiveDurableTopicSubscribers();
mbs.invoke(subNames[0], "destroy", new Object[0], new String[0]);
assertEquals("Subscription exists.", 0, d.getConsumers().size());

View File

@ -47,24 +47,26 @@ public class ManagedDurableSubscriptionTest extends org.apache.activemq.TestSupp
startBroker();
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName subscriptionObjectName = broker.getAdminView().getDurableTopicSubscribers()[0];
ObjectName inactiveSubscriptionObjectName = broker.getAdminView().getInactiveDurableTopicSubscribers()[0];
Object active = mbs.getAttribute(subscriptionObjectName, "Active");
assertTrue("Subscription is active.", Boolean.FALSE.equals(active));
Object inactive = mbs.getAttribute(inactiveSubscriptionObjectName, "Active");
assertTrue("Subscription is active.", Boolean.FALSE.equals(inactive));
// activate
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId");
active = mbs.getAttribute(subscriptionObjectName, "Active");
ObjectName activeSubscriptionObjectName = broker.getAdminView().getDurableTopicSubscribers()[0];
Object active = mbs.getAttribute(activeSubscriptionObjectName, "Active");
assertTrue("Subscription is INactive.", Boolean.TRUE.equals(active));
// deactivate
connection.close();
connection = null;
active = mbs.getAttribute(subscriptionObjectName, "Active");
assertTrue("Subscription is active.", Boolean.FALSE.equals(active));
inactive = mbs.getAttribute(inactiveSubscriptionObjectName, "Active");
assertTrue("Subscription is active.", Boolean.FALSE.equals(inactive));
}

View File

@ -70,7 +70,7 @@
</form>
<h2>Durable Topic Subscribers</h2>
<h2>Active Durable Topic Subscribers</h2>
<table id="topics" class="sortable autostripe">
@ -107,6 +107,48 @@
</td>
</tr>
</c:forEach>
</tbody>
</table>
<h2>Offline Durable Topic Subscribers</h2>
<table id="topics" class="sortable autostripe">
<thead>
<tr>
<th>Client ID</th>
<th>Subscription Name</th>
<th>Connection ID</th>
<th>Destination</th>
<th>Selector</th>
<th>Pending Queue Size</th>
<th>Dispatched Queue Size</th>
<th>Dispatched Counter</th>
<th>Enqueue Counter</th>
<th>Dequeue Counter</th>
<th>Operations</th>
</tr>
</thead>
<tbody>
<c:forEach items="${requestContext.brokerQuery.inactiveDurableTopicSubscribers}" var="row">
<tr>
<td><form:tooltip text="${row.clientId}" length="10"/></td>
<td><form:tooltip text="${row.subscriptionName}" length="10"/></td>
<td><form:tooltip text="${row.connectionId}" length="10"/></td>
<td><form:tooltip text="${row.destinationName}" length="10"/></td>
<td>${row.selector}</td>
<td>${row.pendingQueueSize}</td>
<td>${row.dispatchedQueueSize}</td>
<td>${row.dispachedCounter}</td>
<td>${row.enqueueCounter}</td>
<td>${row.dequeueCounter}</td>
<td>
<a href="deleteSubscriber.action?clientId=${row.clientId}&subscriberName=${row.subscriptionName}&secret=<c:out value='${sessionScope["secret"]}'/>">Delete</a>
</td>
</tr>
</c:forEach>
</tbody>
</table>

View File

@ -80,7 +80,7 @@ public interface BrokerFacade {
throws Exception;
/**
* All durable subscribers to topics of the broker.
* Active durable subscribers to topics of the broker.
*
* @return not <code>null</code>
* @throws Exception
@ -88,6 +88,16 @@ public interface BrokerFacade {
Collection<DurableSubscriptionViewMBean> getDurableTopicSubscribers()
throws Exception;
/**
* Inactive durable subscribers to topics of the broker.
*
* @return not <code>null</code>
* @throws Exception
*/
Collection<DurableSubscriptionViewMBean> getInactiveDurableTopicSubscribers()
throws Exception;
/**
* The names of all transport connectors of the broker (f.e. openwire, ssl)
*

View File

@ -76,6 +76,15 @@ public abstract class BrokerFacadeSupport implements BrokerFacade {
return getManagedObjects(queues, DurableSubscriptionViewMBean.class);
}
public Collection<DurableSubscriptionViewMBean> getInactiveDurableTopicSubscribers() throws Exception {
BrokerViewMBean broker = getBrokerAdmin();
if (broker == null) {
return Collections.EMPTY_LIST;
}
ObjectName[] queues = broker.getInactiveDurableTopicSubscribers();
return getManagedObjects(queues, DurableSubscriptionViewMBean.class);
}
public QueueViewMBean getQueue(String name) throws Exception {
return (QueueViewMBean) getDestinationByName(getQueues(), name);
}