https://issues.apache.org/jira/browse/AMQ-4000 - as part of this feature we need to properly send advisories when durable sub unregisters

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1482790 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2013-05-15 12:31:09 +00:00
parent 8461158178
commit e06685fe81
8 changed files with 75 additions and 29 deletions

View File

@ -25,29 +25,14 @@ import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.broker.region.*;
import org.apache.activemq.command.*;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -263,6 +248,29 @@ public class AdvisoryBroker extends BrokerFilter {
}
}
@Override
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
DurableTopicSubscription sub = ((TopicRegion)((RegionBroker)next).getTopicRegion()).getDurableSubscription(key);
if (sub == null) {
LOG.warn("We cannot send an advisory message for a durable sub removal when we don't know about the durable sub");
return;
}
ActiveMQDestination dest = sub.getConsumerInfo().getDestination();
super.removeSubscription(context, info);
// Don't advise advisory topics.
if (!AdvisorySupport.isAdvisoryTopic(dest)) {
ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
fireConsumerAdvisory(context, dest, topic, info);
}
}
@Override
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
super.removeProducer(context, info);

View File

@ -405,7 +405,7 @@ public class BrokerView implements BrokerViewMBean {
ConnectionContext context = new ConnectionContext();
context.setBroker(safeGetBroker());
context.setClientId(clientId);
safeGetBroker().removeSubscription(context, info);
brokerService.getBroker().removeSubscription(context, info);
}
// doc comment inherited from BrokerViewMBean

View File

@ -20,6 +20,7 @@ import javax.management.openmbean.CompositeData;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Subscription;
@ -31,6 +32,7 @@ import org.apache.activemq.command.RemoveSubscriptionInfo;
public class DurableSubscriptionView extends SubscriptionView implements DurableSubscriptionViewMBean {
protected ManagedRegionBroker broker;
protected BrokerService brokerService;
protected String subscriptionName;
protected DurableTopicSubscription durableSub;
@ -40,9 +42,10 @@ public class DurableSubscriptionView extends SubscriptionView implements Durable
* @param clientId
* @param sub
*/
public DurableSubscriptionView(ManagedRegionBroker broker, String clientId, String userName, Subscription sub) {
public DurableSubscriptionView(ManagedRegionBroker broker, BrokerService brokerService, String clientId, String userName, Subscription sub) {
super(clientId, userName, sub);
this.broker = broker;
this.brokerService = brokerService;
this.durableSub=(DurableTopicSubscription) sub;
if (sub != null) {
this.subscriptionName = sub.getConsumerInfo().getSubscriptionName();
@ -87,7 +90,7 @@ public class DurableSubscriptionView extends SubscriptionView implements Durable
ConnectionContext context = new ConnectionContext();
context.setBroker(broker);
context.setClientId(clientId);
broker.removeSubscription(context, info);
brokerService.getBroker().removeSubscription(context, info);
}
public String toString() {

View File

@ -20,6 +20,7 @@ import javax.management.openmbean.CompositeData;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ConsumerInfo;
@ -41,8 +42,8 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp
* @param userName
* @param subInfo
*/
public InactiveDurableSubscriptionView(ManagedRegionBroker broker, String clientId, SubscriptionInfo subInfo, Subscription subscription) {
super(broker, clientId, null, subscription);
public InactiveDurableSubscriptionView(ManagedRegionBroker broker, BrokerService brokerService, String clientId, SubscriptionInfo subInfo, Subscription subscription) {
super(broker, brokerService, clientId, null, subscription);
this.broker = broker;
this.subscriptionInfo = subInfo;
}
@ -134,7 +135,7 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp
ConnectionContext context = new ConnectionContext();
context.setBroker(broker);
context.setClientId(clientId);
broker.removeSubscription(context, info);
brokerService.getBroker().removeSubscription(context, info);
}
public String toString() {

View File

@ -206,7 +206,7 @@ public class ManagedRegionBroker extends RegionBroker {
} else {
String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null;
if (sub.getConsumerInfo().isDurable()) {
view = new DurableSubscriptionView(this, context.getClientId(), userName, sub);
view = new DurableSubscriptionView(this, brokerService, context.getClientId(), userName, sub);
} else {
if (sub instanceof TopicSubscription) {
view = new TopicSubscriptionView(context.getClientId(), userName, (TopicSubscription) sub);
@ -509,7 +509,7 @@ public class ManagedRegionBroker extends RegionBroker {
try {
ConsumerInfo offlineConsumerInfo = subscription != null ? subscription.getConsumerInfo() : ((TopicRegion)getTopicRegion()).createInactiveConsumerInfo(info);
ObjectName objectName = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, info.getClientId(), offlineConsumerInfo);
SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info, subscription);
SubscriptionView view = new InactiveDurableSubscriptionView(this, brokerService, key.getClientId(), info, subscription);
try {
AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);

View File

@ -411,6 +411,7 @@ public class RegionBroker extends EmptyBroker {
topicRegion.removeSubscription(context, info);
} finally {
inactiveDestinationsPurgeLock.readLock().unlock();
}
}

View File

@ -378,4 +378,8 @@ public class TopicRegion extends AbstractRegion {
public boolean durableSubscriptionExists(SubscriptionKey key) {
return this.durableSubscriptions.containsKey(key);
}
public DurableTopicSubscription getDurableSubscription(SubscriptionKey key) {
return durableSubscriptions.get(key);
}
}

View File

@ -18,15 +18,16 @@ package org.apache.activemq.usecases;
import java.io.File;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Session;
import javax.jms.*;
import javax.management.InstanceNotFoundException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.TestSupport;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
@ -34,6 +35,7 @@ import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
@ -70,12 +72,14 @@ public class DurableSubscriptionUnsubscribeTest extends TestSupport {
public void doJMXUnsubscribe(boolean restart) throws Exception {
createSubscriptions();
createAdvisorySubscription();
Thread.sleep(1000);
assertCount(100, 0);
if (restart) {
restartBroker();
createAdvisorySubscription();
assertCount(100, 0);
}
@ -97,12 +101,14 @@ public class DurableSubscriptionUnsubscribeTest extends TestSupport {
if (restart) {
restartBroker();
createAdvisorySubscription();
assertCount(0, 0);
}
}
public void doConnectionUnsubscribe(boolean restart) throws Exception {
createSubscriptions();
createAdvisorySubscription();
Thread.sleep(1000);
assertCount(100, 0);
@ -131,6 +137,7 @@ public class DurableSubscriptionUnsubscribeTest extends TestSupport {
if (restart) {
restartBroker();
createAdvisorySubscription();
assertCount(100, 0);
}
@ -150,18 +157,21 @@ public class DurableSubscriptionUnsubscribeTest extends TestSupport {
if (restart) {
restartBroker();
createAdvisorySubscription();
assertCount(0, 0);
}
}
public void doDirectUnsubscribe(boolean restart) throws Exception {
createSubscriptions();
createAdvisorySubscription();
Thread.sleep(1000);
assertCount(100, 0);
if (restart) {
restartBroker();
createAdvisorySubscription();
assertCount(100, 0);
}
@ -172,9 +182,10 @@ public class DurableSubscriptionUnsubscribeTest extends TestSupport {
ConnectionContext context = new ConnectionContext();
context.setBroker(broker.getRegionBroker());
context.setClientId(getName());
broker.getRegionBroker().removeSubscription(context, info);
broker.getBroker().removeSubscription(context, info);
if (i % 20 == 0) {
Thread.sleep(1000);
assertCount(100 - i - 1, 0);
}
}
@ -183,6 +194,7 @@ public class DurableSubscriptionUnsubscribeTest extends TestSupport {
if (restart) {
restartBroker();
createAdvisorySubscription();
assertCount(0, 0);
}
}
@ -195,6 +207,20 @@ public class DurableSubscriptionUnsubscribeTest extends TestSupport {
}
}
private final AtomicInteger advisories = new AtomicInteger(0);
private void createAdvisorySubscription() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer advisoryConsumer = session.createConsumer(AdvisorySupport.getConsumerAdvisoryTopic(topic));
advisoryConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (((ActiveMQMessage)message).getDataStructure() instanceof RemoveSubscriptionInfo) {
advisories.incrementAndGet();
}
}
});
}
private void assertCount(int all, int active) throws Exception {
int inactive = all - active;
@ -224,6 +250,9 @@ public class DurableSubscriptionUnsubscribeTest extends TestSupport {
// check the strange false MBean
if (all == 0)
assertEquals(0, countMBean());
// check if we got all advisories
assertEquals(100, all + advisories.get());
}
private int countMBean() throws MalformedObjectNameException, InstanceNotFoundException {