diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index 100e14a065..d16d1319ad 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -25,29 +25,14 @@ import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerFilter; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; -import org.apache.activemq.broker.region.Destination; -import org.apache.activemq.broker.region.MessageReference; -import org.apache.activemq.broker.region.Subscription; -import org.apache.activemq.broker.region.TopicSubscription; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.command.BrokerInfo; -import org.apache.activemq.command.Command; -import org.apache.activemq.command.ConnectionId; -import org.apache.activemq.command.ConnectionInfo; -import org.apache.activemq.command.ConsumerId; -import org.apache.activemq.command.ConsumerInfo; -import org.apache.activemq.command.DestinationInfo; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.command.ProducerId; -import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.broker.region.*; +import org.apache.activemq.command.*; import org.apache.activemq.security.SecurityContext; import org.apache.activemq.state.ProducerState; import org.apache.activemq.usage.Usage; import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.LongSequenceGenerator; +import org.apache.activemq.util.SubscriptionKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -263,6 +248,29 @@ public class AdvisoryBroker extends BrokerFilter { } } + @Override + public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { + SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); + + DurableTopicSubscription sub = ((TopicRegion)((RegionBroker)next).getTopicRegion()).getDurableSubscription(key); + + if (sub == null) { + LOG.warn("We cannot send an advisory message for a durable sub removal when we don't know about the durable sub"); + return; + } + + ActiveMQDestination dest = sub.getConsumerInfo().getDestination(); + + super.removeSubscription(context, info); + + // Don't advise advisory topics. + if (!AdvisorySupport.isAdvisoryTopic(dest)) { + ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest); + fireConsumerAdvisory(context, dest, topic, info); + } + + } + @Override public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { super.removeProducer(context, info); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java index 4605a3ef01..b6880006d9 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java @@ -405,7 +405,7 @@ public class BrokerView implements BrokerViewMBean { ConnectionContext context = new ConnectionContext(); context.setBroker(safeGetBroker()); context.setClientId(clientId); - safeGetBroker().removeSubscription(context, info); + brokerService.getBroker().removeSubscription(context, info); } // doc comment inherited from BrokerViewMBean diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java index 32ee384cf4..77ec8c2c5d 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java @@ -20,6 +20,7 @@ import javax.management.openmbean.CompositeData; import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.Subscription; @@ -31,6 +32,7 @@ import org.apache.activemq.command.RemoveSubscriptionInfo; public class DurableSubscriptionView extends SubscriptionView implements DurableSubscriptionViewMBean { protected ManagedRegionBroker broker; + protected BrokerService brokerService; protected String subscriptionName; protected DurableTopicSubscription durableSub; @@ -40,9 +42,10 @@ public class DurableSubscriptionView extends SubscriptionView implements Durable * @param clientId * @param sub */ - public DurableSubscriptionView(ManagedRegionBroker broker, String clientId, String userName, Subscription sub) { + public DurableSubscriptionView(ManagedRegionBroker broker, BrokerService brokerService, String clientId, String userName, Subscription sub) { super(clientId, userName, sub); this.broker = broker; + this.brokerService = brokerService; this.durableSub=(DurableTopicSubscription) sub; if (sub != null) { this.subscriptionName = sub.getConsumerInfo().getSubscriptionName(); @@ -87,7 +90,7 @@ public class DurableSubscriptionView extends SubscriptionView implements Durable ConnectionContext context = new ConnectionContext(); context.setBroker(broker); context.setClientId(clientId); - broker.removeSubscription(context, info); + brokerService.getBroker().removeSubscription(context, info); } public String toString() { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java index 5b4827186e..35c0b92ab4 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java @@ -20,6 +20,7 @@ import javax.management.openmbean.CompositeData; import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ConsumerInfo; @@ -41,8 +42,8 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp * @param userName * @param subInfo */ - public InactiveDurableSubscriptionView(ManagedRegionBroker broker, String clientId, SubscriptionInfo subInfo, Subscription subscription) { - super(broker, clientId, null, subscription); + public InactiveDurableSubscriptionView(ManagedRegionBroker broker, BrokerService brokerService, String clientId, SubscriptionInfo subInfo, Subscription subscription) { + super(broker, brokerService, clientId, null, subscription); this.broker = broker; this.subscriptionInfo = subInfo; } @@ -134,7 +135,7 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp ConnectionContext context = new ConnectionContext(); context.setBroker(broker); context.setClientId(clientId); - broker.removeSubscription(context, info); + brokerService.getBroker().removeSubscription(context, info); } public String toString() { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java index cf95b308ce..db273cfd0c 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java @@ -206,7 +206,7 @@ public class ManagedRegionBroker extends RegionBroker { } else { String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null; if (sub.getConsumerInfo().isDurable()) { - view = new DurableSubscriptionView(this, context.getClientId(), userName, sub); + view = new DurableSubscriptionView(this, brokerService, context.getClientId(), userName, sub); } else { if (sub instanceof TopicSubscription) { view = new TopicSubscriptionView(context.getClientId(), userName, (TopicSubscription) sub); @@ -509,7 +509,7 @@ public class ManagedRegionBroker extends RegionBroker { try { ConsumerInfo offlineConsumerInfo = subscription != null ? subscription.getConsumerInfo() : ((TopicRegion)getTopicRegion()).createInactiveConsumerInfo(info); ObjectName objectName = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, info.getClientId(), offlineConsumerInfo); - SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info, subscription); + SubscriptionView view = new InactiveDurableSubscriptionView(this, brokerService, key.getClientId(), info, subscription); try { AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 228986cda0..73ebfe00b2 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -411,6 +411,7 @@ public class RegionBroker extends EmptyBroker { topicRegion.removeSubscription(context, info); } finally { inactiveDestinationsPurgeLock.readLock().unlock(); + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java index 12e30f42c4..50c1f50b9d 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java @@ -378,4 +378,8 @@ public class TopicRegion extends AbstractRegion { public boolean durableSubscriptionExists(SubscriptionKey key) { return this.durableSubscriptions.containsKey(key); } + + public DurableTopicSubscription getDurableSubscription(SubscriptionKey key) { + return durableSubscriptions.get(key); + } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java index 6de1952061..d0ecfdef90 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java @@ -18,15 +18,16 @@ package org.apache.activemq.usecases; import java.io.File; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; -import javax.jms.Connection; -import javax.jms.Session; +import javax.jms.*; import javax.management.InstanceNotFoundException; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.TestSupport; +import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; @@ -34,6 +35,7 @@ import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; @@ -70,12 +72,14 @@ public class DurableSubscriptionUnsubscribeTest extends TestSupport { public void doJMXUnsubscribe(boolean restart) throws Exception { createSubscriptions(); + createAdvisorySubscription(); Thread.sleep(1000); assertCount(100, 0); if (restart) { restartBroker(); + createAdvisorySubscription(); assertCount(100, 0); } @@ -97,12 +101,14 @@ public class DurableSubscriptionUnsubscribeTest extends TestSupport { if (restart) { restartBroker(); + createAdvisorySubscription(); assertCount(0, 0); } } public void doConnectionUnsubscribe(boolean restart) throws Exception { createSubscriptions(); + createAdvisorySubscription(); Thread.sleep(1000); assertCount(100, 0); @@ -131,6 +137,7 @@ public class DurableSubscriptionUnsubscribeTest extends TestSupport { if (restart) { restartBroker(); + createAdvisorySubscription(); assertCount(100, 0); } @@ -150,18 +157,21 @@ public class DurableSubscriptionUnsubscribeTest extends TestSupport { if (restart) { restartBroker(); + createAdvisorySubscription(); assertCount(0, 0); } } public void doDirectUnsubscribe(boolean restart) throws Exception { createSubscriptions(); + createAdvisorySubscription(); Thread.sleep(1000); assertCount(100, 0); if (restart) { restartBroker(); + createAdvisorySubscription(); assertCount(100, 0); } @@ -172,9 +182,10 @@ public class DurableSubscriptionUnsubscribeTest extends TestSupport { ConnectionContext context = new ConnectionContext(); context.setBroker(broker.getRegionBroker()); context.setClientId(getName()); - broker.getRegionBroker().removeSubscription(context, info); + broker.getBroker().removeSubscription(context, info); if (i % 20 == 0) { + Thread.sleep(1000); assertCount(100 - i - 1, 0); } } @@ -183,6 +194,7 @@ public class DurableSubscriptionUnsubscribeTest extends TestSupport { if (restart) { restartBroker(); + createAdvisorySubscription(); assertCount(0, 0); } } @@ -195,6 +207,20 @@ public class DurableSubscriptionUnsubscribeTest extends TestSupport { } } + private final AtomicInteger advisories = new AtomicInteger(0); + + private void createAdvisorySubscription() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer advisoryConsumer = session.createConsumer(AdvisorySupport.getConsumerAdvisoryTopic(topic)); + advisoryConsumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + if (((ActiveMQMessage)message).getDataStructure() instanceof RemoveSubscriptionInfo) { + advisories.incrementAndGet(); + } + } + }); + } private void assertCount(int all, int active) throws Exception { int inactive = all - active; @@ -224,6 +250,9 @@ public class DurableSubscriptionUnsubscribeTest extends TestSupport { // check the strange false MBean if (all == 0) assertEquals(0, countMBean()); + + // check if we got all advisories + assertEquals(100, all + advisories.get()); } private int countMBean() throws MalformedObjectNameException, InstanceNotFoundException {