From af999fe2b2f5140fd7fcae187b491dddbcde1fe9 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Fri, 5 Jun 2015 18:05:03 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5303 Resolves issues with the virtual topic subscription strategy especially when subscribing durably to the Topic portion of a virtual destination. --- .../AbstractMQTTSubscriptionStrategy.java | 74 ++++++++++++ .../MQTTDefaultSubscriptionStrategy.java | 81 +------------ .../MQTTVirtualTopicSubscriptionStrategy.java | 65 +++++++++-- .../MQTTVirtualTopicSubscriptionsTest.java | 110 ++++++++++++++++-- 4 files changed, 231 insertions(+), 99 deletions(-) diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java index bb8419248e..f3bf94e6cf 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java @@ -17,6 +17,10 @@ package org.apache.activemq.transport.mqtt.strategy; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -24,6 +28,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.PrefetchSubscription; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.Subscription; @@ -35,9 +40,12 @@ import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ExceptionResponse; import org.apache.activemq.command.RemoveInfo; +import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.Response; +import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.transport.mqtt.MQTTProtocolConverter; import org.apache.activemq.transport.mqtt.MQTTProtocolException; +import org.apache.activemq.transport.mqtt.MQTTProtocolSupport; import org.apache.activemq.transport.mqtt.MQTTSubscription; import org.apache.activemq.transport.mqtt.ResponseHandler; import org.apache.activemq.util.LongSequenceGenerator; @@ -61,6 +69,7 @@ public abstract class AbstractMQTTSubscriptionStrategy implements MQTTSubscripti protected final ConcurrentMap subscriptionsByConsumerId = new ConcurrentHashMap(); protected final ConcurrentMap mqttSubscriptionByTopic = new ConcurrentHashMap(); + protected final Set restoredDurableSubs = Collections.synchronizedSet(new HashSet()); protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); @@ -242,4 +251,69 @@ public abstract class AbstractMQTTSubscriptionStrategy implements MQTTSubscripti }); } } + + //----- Durable Subscription management methods --------------------------// + + protected void deleteDurableSubs(List subs) { + try { + for (SubscriptionInfo sub : subs) { + RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); + rsi.setConnectionId(protocol.getConnectionId()); + rsi.setSubscriptionName(sub.getSubcriptionName()); + rsi.setClientId(sub.getClientId()); + protocol.sendToActiveMQ(rsi, new ResponseHandler() { + @Override + public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { + // ignore failures.. + } + }); + } + } catch (Throwable e) { + LOG.warn("Could not delete the MQTT durable subs.", e); + } + } + + protected void restoreDurableSubs(List subs) { + try { + for (SubscriptionInfo sub : subs) { + String name = sub.getSubcriptionName(); + String[] split = name.split(":", 2); + QoS qoS = QoS.valueOf(split[0]); + onSubscribe(new Topic(split[1], qoS)); + // mark this durable subscription as restored by Broker + restoredDurableSubs.add(MQTTProtocolSupport.convertMQTTToActiveMQ(split[1])); + } + } catch (IOException e) { + LOG.warn("Could not restore the MQTT durable subs.", e); + } + } + + protected List lookupSubscription(String clientId) throws MQTTProtocolException { + List result = new ArrayList(); + RegionBroker regionBroker; + + try { + regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class); + } catch (Exception e) { + throw new MQTTProtocolException("Error recovering durable subscriptions: " + e.getMessage(), false, e); + } + + final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion(); + List subscriptions = topicRegion.lookupSubscriptions(clientId); + if (subscriptions != null) { + for (DurableTopicSubscription subscription : subscriptions) { + LOG.debug("Recovered durable sub:{} on connect", subscription); + + SubscriptionInfo info = new SubscriptionInfo(); + + info.setDestination(subscription.getActiveMQDestination()); + info.setSubcriptionName(subscription.getSubscriptionKey().getSubscriptionName()); + info.setClientId(clientId); + + result.add(info); + } + } + + return result; + } } diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java index 68d6cb9c8a..63b0e85ea3 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java @@ -17,16 +17,9 @@ package org.apache.activemq.transport.mqtt.strategy; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; import java.util.List; -import java.util.Set; import org.apache.activemq.ActiveMQPrefetchPolicy; -import org.apache.activemq.broker.region.DurableTopicSubscription; -import org.apache.activemq.broker.region.RegionBroker; -import org.apache.activemq.broker.region.TopicRegion; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ConsumerInfo; @@ -39,20 +32,13 @@ import org.apache.activemq.transport.mqtt.MQTTProtocolSupport; import org.apache.activemq.transport.mqtt.MQTTSubscription; import org.apache.activemq.transport.mqtt.ResponseHandler; import org.fusesource.mqtt.client.QoS; -import org.fusesource.mqtt.client.Topic; import org.fusesource.mqtt.codec.CONNECT; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Default implementation that uses unmapped topic subscriptions. */ public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStrategy { - private static final Logger LOG = LoggerFactory.getLogger(MQTTDefaultSubscriptionStrategy.class); - - private final Set restoredSubs = Collections.synchronizedSet(new HashSet()); - @Override public void onConnect(CONNECT connect) throws MQTTProtocolException { List subs = lookupSubscription(protocol.getClientId()); @@ -93,7 +79,7 @@ public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStr // check whether the Topic has been recovered in restoreDurableSubs // mark subscription available for recovery for duplicate subscription - if (restoredSubs.remove(destination.getPhysicalName())) { + if (restoredDurableSubs.remove(destination.getPhysicalName())) { return; } @@ -109,7 +95,7 @@ public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStr // check if the durable sub also needs to be removed if (subscription.getConsumerInfo().getSubscriptionName() != null) { // also remove it from restored durable subscriptions set - restoredSubs.remove(MQTTProtocolSupport.convertMQTTToActiveMQ(subscription.getTopicName())); + restoredDurableSubs.remove(MQTTProtocolSupport.convertMQTTToActiveMQ(subscription.getTopicName())); RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); rsi.setConnectionId(protocol.getConnectionId()); @@ -124,67 +110,4 @@ public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStr } } } - - private void deleteDurableSubs(List subs) { - try { - for (SubscriptionInfo sub : subs) { - RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); - rsi.setConnectionId(protocol.getConnectionId()); - rsi.setSubscriptionName(sub.getSubcriptionName()); - rsi.setClientId(sub.getClientId()); - protocol.sendToActiveMQ(rsi, new ResponseHandler() { - @Override - public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { - // ignore failures.. - } - }); - } - } catch (Throwable e) { - LOG.warn("Could not delete the MQTT durable subs.", e); - } - } - - private void restoreDurableSubs(List subs) { - try { - for (SubscriptionInfo sub : subs) { - String name = sub.getSubcriptionName(); - String[] split = name.split(":", 2); - QoS qoS = QoS.valueOf(split[0]); - onSubscribe(new Topic(split[1], qoS)); - // mark this durable subscription as restored by Broker - restoredSubs.add(MQTTProtocolSupport.convertMQTTToActiveMQ(split[1])); - } - } catch (IOException e) { - LOG.warn("Could not restore the MQTT durable subs.", e); - } - } - - List lookupSubscription(String clientId) throws MQTTProtocolException { - List result = new ArrayList(); - RegionBroker regionBroker; - - try { - regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class); - } catch (Exception e) { - throw new MQTTProtocolException("Error recovering durable subscriptions: " + e.getMessage(), false, e); - } - - final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion(); - List subscriptions = topicRegion.lookupSubscriptions(clientId); - if (subscriptions != null) { - for (DurableTopicSubscription subscription : subscriptions) { - LOG.debug("Recovered durable sub:{} on connect", subscription); - - SubscriptionInfo info = new SubscriptionInfo(); - - info.setDestination(subscription.getActiveMQDestination()); - info.setSubcriptionName(subscription.getSubscriptionKey().getSubscriptionName()); - info.setClientId(clientId); - - result.add(info); - } - } - - return result; - } } diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java index d0735e131a..468e823295 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java @@ -35,9 +35,12 @@ import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.DestinationInfo; +import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.Response; +import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.transport.mqtt.MQTTProtocolConverter; import org.apache.activemq.transport.mqtt.MQTTProtocolException; +import org.apache.activemq.transport.mqtt.MQTTProtocolSupport; import org.apache.activemq.transport.mqtt.MQTTSubscription; import org.apache.activemq.transport.mqtt.ResponseHandler; import org.fusesource.mqtt.client.QoS; @@ -62,35 +65,53 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti @Override public void onConnect(CONNECT connect) throws MQTTProtocolException { List queues = lookupQueues(protocol.getClientId()); + List subs = lookupSubscription(protocol.getClientId()); + + // When clean session is true we must purge all of the client's old Queue subscriptions + // and any durable subscriptions created on the VirtualTopic instance as well. if (connect.cleanSession()) { deleteDurableQueues(queues); + deleteDurableSubs(subs); } else { restoreDurableQueue(queues); + restoreDurableSubs(subs); } } @Override public byte onSubscribe(String topicName, QoS requestedQoS) throws MQTTProtocolException { ActiveMQDestination destination = null; + int prefetch = ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH; ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId()); + if (!protocol.isCleanSession() && protocol.getClientId() != null && requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) { - String converted = VIRTUALTOPIC_CONSUMER_PREFIX + protocol.getClientId() + ":" + requestedQoS + "." + - VIRTUALTOPIC_PREFIX + convertMQTTToActiveMQ(topicName); - destination = new ActiveMQQueue(converted); - consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH); + String converted = convertMQTTToActiveMQ(topicName); + if (converted.startsWith(VIRTUALTOPIC_PREFIX)) { + destination = new ActiveMQTopic(converted); + prefetch = ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH; + consumerInfo.setSubscriptionName(requestedQoS + ":" + topicName); + } else { + converted = VIRTUALTOPIC_CONSUMER_PREFIX + + protocol.getClientId() + ":" + requestedQoS + "." + + VIRTUALTOPIC_PREFIX + converted; + destination = new ActiveMQQueue(converted); + prefetch = ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH; + } } else { String converted = convertMQTTToActiveMQ(topicName); if (!converted.startsWith(VIRTUALTOPIC_PREFIX)) { - converted = VIRTUALTOPIC_PREFIX + convertMQTTToActiveMQ(topicName); + converted = VIRTUALTOPIC_PREFIX + converted; } destination = new ActiveMQTopic(converted); - consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH); + prefetch = ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH; } consumerInfo.setDestination(destination); if (protocol.getActiveMQSubscriptionPrefetch() > 0) { consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch()); + } else { + consumerInfo.setPrefetchSize(prefetch); } consumerInfo.setRetroactive(true); consumerInfo.setDispatchAsync(true); @@ -103,9 +124,15 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti ActiveMQDestination destination = mqttSubscription.getDestination(); + // check whether the Queue has been recovered in restoreDurableQueue + // mark subscription available for recovery for duplicate subscription + if (destination.isQueue() && restoredQueues.remove(destination)) { + return; + } + // check whether the Topic has been recovered in restoreDurableSubs // mark subscription available for recovery for duplicate subscription - if (restoredQueues.remove(destination)) { + if (destination.isTopic() && restoredDurableSubs.remove(destination.getPhysicalName())) { return; } @@ -136,6 +163,20 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti // ignore failures.. } }); + } else if (subscription.getConsumerInfo().getSubscriptionName() != null) { + // also remove it from restored durable subscriptions set + restoredDurableSubs.remove(MQTTProtocolSupport.convertMQTTToActiveMQ(subscription.getTopicName())); + + RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); + rsi.setConnectionId(protocol.getConnectionId()); + rsi.setSubscriptionName(subscription.getConsumerInfo().getSubscriptionName()); + rsi.setClientId(protocol.getClientId()); + protocol.sendToActiveMQ(rsi, new ResponseHandler() { + @Override + public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { + // ignore failures.. + } + }); } } } @@ -154,7 +195,7 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti String destinationName = destination.getPhysicalName(); int position = destinationName.indexOf(VIRTUALTOPIC_PREFIX); if (position >= 0) { - destinationName = destinationName.substring(position+VIRTUALTOPIC_PREFIX.length()).substring(0); + destinationName = destinationName.substring(position + VIRTUALTOPIC_PREFIX.length()).substring(0); } return destinationName; } @@ -171,7 +212,7 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti private void deleteDurableQueues(List queues) { try { for (ActiveMQQueue queue : queues) { - LOG.debug("Removing subscription for {} ",queue.getPhysicalName()); + LOG.debug("Removing queue subscription for {} ",queue.getPhysicalName()); DestinationInfo removeAction = new DestinationInfo(); removeAction.setConnectionId(protocol.getConnectionId()); removeAction.setDestination(queue); @@ -185,7 +226,7 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti }); } } catch (Throwable e) { - LOG.warn("Could not delete the MQTT durable subs.", e); + LOG.warn("Could not delete the MQTT queue subsscriptions.", e); } } @@ -199,7 +240,7 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti tokenizer.nextToken(); String topicName = convertActiveMQToMQTT(tokenizer.nextToken("").substring(1)); QoS qoS = QoS.valueOf(qosString); - LOG.trace("Restoring subscription: {}:{}", topicName, qoS); + LOG.trace("Restoring queue subscription: {}:{}", topicName, qoS); ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId()); consumerInfo.setDestination(queue); @@ -216,7 +257,7 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti restoredQueues.add(queue); } } catch (IOException e) { - LOG.warn("Could not restore the MQTT durable subs.", e); + LOG.warn("Could not restore the MQTT queue subscriptions.", e); } } diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java index b4f985c305..052a7ee33c 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java @@ -16,8 +16,14 @@ */ package org.apache.activemq.transport.mqtt; +import static org.junit.Assert.assertTrue; + +import org.apache.activemq.util.Wait; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.QoS; +import org.fusesource.mqtt.client.Topic; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; /** @@ -32,13 +38,6 @@ public class MQTTVirtualTopicSubscriptionsTest extends MQTTTest { super.setUp(); } - // TODO - This currently fails on the durable case because we have a hard time - // recovering the original Topic name when a client tries to subscribe - // durable to a VirtualTopic.* type topic. - @Override - @Ignore - public void testRetainedMessageOnVirtualTopics() throws Exception {} - @Override @Test(timeout = 60 * 1000) public void testSendMQTTReceiveJMS() throws Exception { @@ -56,4 +55,99 @@ public class MQTTVirtualTopicSubscriptionsTest extends MQTTTest { public void testJmsMapping() throws Exception { doTestJmsMapping("VirtualTopic.test.foo"); } + + @Test(timeout = 60 * 1000) + public void testSubscribeOnVirtualTopicAsDurable() throws Exception { + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("VirtualTopicSubscriber"); + mqtt.setKeepAlive((short) 2); + mqtt.setCleanSession(false); + + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + final String topicName = "VirtualTopic/foo/bah"; + + connection.subscribe(new Topic[] { new Topic(topicName, QoS.EXACTLY_ONCE)}); + + assertTrue("Should create a durable subscription", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getDurableTopicSubscribers().length == 1; + } + })); + + connection.unsubscribe(new String[] { topicName }); + + assertTrue("Should remove a durable subscription", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getDurableTopicSubscribers().length == 0; + } + })); + + connection.disconnect(); + } + + @Test(timeout = 60 * 1000) + public void testDurableVirtaulTopicSubIsRecovered() throws Exception { + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("VirtualTopicSubscriber"); + mqtt.setKeepAlive((short) 2); + mqtt.setCleanSession(false); + + final String topicName = "VirtualTopic/foo/bah"; + + { + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + connection.subscribe(new Topic[] { new Topic(topicName, QoS.EXACTLY_ONCE)}); + + assertTrue("Should create a durable subscription", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getDurableTopicSubscribers().length == 1; + } + })); + + connection.disconnect(); + } + + assertTrue("Should be one inactive subscription", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 1; + } + })); + + { + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + assertTrue("Should recover a durable subscription", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getDurableTopicSubscribers().length == 1; + } + })); + + connection.subscribe(new Topic[] { new Topic(topicName, QoS.EXACTLY_ONCE)}); + + assertTrue("Should still be just one durable subscription", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getDurableTopicSubscribers().length == 1; + } + })); + + connection.disconnect(); + } + } }