From 88c6ee97e0fc8652f2dba16786cb8cbd7b80a1b7 Mon Sep 17 00:00:00 2001 From: Dhiraj Bokde Date: Tue, 13 May 2014 00:29:40 -0700 Subject: [PATCH] Fixed AMQ-5160, remove durable subscription in onUnsubscribe() --- .../transport/mqtt/MQTTProtocolConverter.java | 48 ++++++++++++------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index cbb6415d67..71a6fcfb24 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -361,12 +361,13 @@ public class MQTTProtocolConverter { ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topicName.toString())); if( mqttSubscriptionByTopic.containsKey(topicName)) { - if (topicQoS != mqttSubscriptionByTopic.get(topicName).qos()) { + final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(topicName); + if (topicQoS != mqttSubscription.qos()) { // remove old subscription as the QoS has changed onUnSubscribe(topicName); } else { // duplicate SUBSCRIBE packet, find all matching topics and resend retained messages - resendRetainedMessages(topicName, destination); + resendRetainedMessages(topicName, destination, mqttSubscription); return (byte) topicQoS.ordinal(); } @@ -408,7 +409,8 @@ public class MQTTProtocolConverter { return qos[0]; } - private void resendRetainedMessages(UTF8Buffer topicName, ActiveMQDestination destination) throws MQTTProtocolException { + private void resendRetainedMessages(UTF8Buffer topicName, ActiveMQDestination destination, + MQTTSubscription mqttSubscription) throws MQTTProtocolException { // get TopicRegion RegionBroker regionBroker; try { @@ -418,25 +420,26 @@ public class MQTTProtocolConverter { } final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion(); + final ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo(); + final ConsumerId consumerId = consumerInfo.getConsumerId(); + + // use actual client id used to create connection to lookup connection context + final String connectionInfoClientId = connectionInfo.getClientId(); + final ConnectionContext connectionContext = regionBroker.getConnectionContext(connectionInfoClientId); + // get all matching Topics final Set matchingDestinations = topicRegion.getDestinations(destination); for (org.apache.activemq.broker.region.Destination dest : matchingDestinations) { - // find matching MQTT subscription for this client - final String mqttTopicName = convertActiveMQToMQTT(dest.getName()); - final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(new UTF8Buffer(mqttTopicName)); - if (mqttSubscription != null) { - // recover retroactive messages for matching subscription - final ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo(); - final ConsumerId consumerId = consumerInfo.getConsumerId(); - final Subscription subscription = topicRegion.getSubscriptions().get(consumerId); - // use actual client id used to create connection to lookup connection context - final ConnectionContext connectionContext = regionBroker.getConnectionContext(connectionInfo.getClientId()); - try { - ((org.apache.activemq.broker.region.Topic)dest).recoverRetroactiveMessages(connectionContext, subscription); - } catch (Exception e) { - throw new MQTTProtocolException("Error recovering retained messages for " + - mqttTopicName + ": " + e.getMessage(), false, e); + // recover retroactive messages for matching subscriptions + for (Subscription subscription : dest.getConsumers()) { + if (subscription.getConsumerInfo().getConsumerId().equals(consumerId)) { + try { + ((org.apache.activemq.broker.region.Topic)dest).recoverRetroactiveMessages(connectionContext, subscription); + } catch (Exception e) { + throw new MQTTProtocolException("Error recovering retained messages for " + + dest.getName() + ": " + e.getMessage(), false, e); + } } } } @@ -467,6 +470,15 @@ public class MQTTProtocolConverter { removeInfo = info.createRemoveCommand(); } sendToActiveMQ(removeInfo, null); + + // check if the durable sub also needs to be removed + if (subs.getConsumerInfo().getSubscriptionName() != null) { + RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); + rsi.setConnectionId(connectionId); + rsi.setSubscriptionName(subs.getConsumerInfo().getSubscriptionName()); + rsi.setClientId(connectionInfo.getClientId()); + sendToActiveMQ(rsi, null); + } } }