Fixed AMQ-5160, remove durable subscription in onUnsubscribe()

This commit is contained in:
Dhiraj Bokde 2014-05-13 00:29:40 -07:00 committed by Dejan Bosanac
parent 8947a09eaa
commit 88c6ee97e0
1 changed files with 30 additions and 18 deletions

View File

@ -361,12 +361,13 @@ public class MQTTProtocolConverter {
ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topicName.toString())); ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topicName.toString()));
if( mqttSubscriptionByTopic.containsKey(topicName)) { if( mqttSubscriptionByTopic.containsKey(topicName)) {
if (topicQoS != mqttSubscriptionByTopic.get(topicName).qos()) { final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(topicName);
if (topicQoS != mqttSubscription.qos()) {
// remove old subscription as the QoS has changed // remove old subscription as the QoS has changed
onUnSubscribe(topicName); onUnSubscribe(topicName);
} else { } else {
// duplicate SUBSCRIBE packet, find all matching topics and resend retained messages // duplicate SUBSCRIBE packet, find all matching topics and resend retained messages
resendRetainedMessages(topicName, destination); resendRetainedMessages(topicName, destination, mqttSubscription);
return (byte) topicQoS.ordinal(); return (byte) topicQoS.ordinal();
} }
@ -408,7 +409,8 @@ public class MQTTProtocolConverter {
return qos[0]; return qos[0];
} }
private void resendRetainedMessages(UTF8Buffer topicName, ActiveMQDestination destination) throws MQTTProtocolException { private void resendRetainedMessages(UTF8Buffer topicName, ActiveMQDestination destination,
MQTTSubscription mqttSubscription) throws MQTTProtocolException {
// get TopicRegion // get TopicRegion
RegionBroker regionBroker; RegionBroker regionBroker;
try { try {
@ -418,25 +420,26 @@ public class MQTTProtocolConverter {
} }
final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion(); final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
final ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo();
final ConsumerId consumerId = consumerInfo.getConsumerId();
// use actual client id used to create connection to lookup connection context
final String connectionInfoClientId = connectionInfo.getClientId();
final ConnectionContext connectionContext = regionBroker.getConnectionContext(connectionInfoClientId);
// get all matching Topics // get all matching Topics
final Set<org.apache.activemq.broker.region.Destination> matchingDestinations = topicRegion.getDestinations(destination); final Set<org.apache.activemq.broker.region.Destination> matchingDestinations = topicRegion.getDestinations(destination);
for (org.apache.activemq.broker.region.Destination dest : matchingDestinations) { for (org.apache.activemq.broker.region.Destination dest : matchingDestinations) {
// find matching MQTT subscription for this client
final String mqttTopicName = convertActiveMQToMQTT(dest.getName());
final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(new UTF8Buffer(mqttTopicName));
if (mqttSubscription != null) {
// recover retroactive messages for matching subscription
final ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo();
final ConsumerId consumerId = consumerInfo.getConsumerId();
final Subscription subscription = topicRegion.getSubscriptions().get(consumerId);
// use actual client id used to create connection to lookup connection context // recover retroactive messages for matching subscriptions
final ConnectionContext connectionContext = regionBroker.getConnectionContext(connectionInfo.getClientId()); for (Subscription subscription : dest.getConsumers()) {
if (subscription.getConsumerInfo().getConsumerId().equals(consumerId)) {
try { try {
((org.apache.activemq.broker.region.Topic)dest).recoverRetroactiveMessages(connectionContext, subscription); ((org.apache.activemq.broker.region.Topic)dest).recoverRetroactiveMessages(connectionContext, subscription);
} catch (Exception e) { } catch (Exception e) {
throw new MQTTProtocolException("Error recovering retained messages for " + throw new MQTTProtocolException("Error recovering retained messages for " +
mqttTopicName + ": " + e.getMessage(), false, e); dest.getName() + ": " + e.getMessage(), false, e);
}
} }
} }
} }
@ -467,6 +470,15 @@ public class MQTTProtocolConverter {
removeInfo = info.createRemoveCommand(); removeInfo = info.createRemoveCommand();
} }
sendToActiveMQ(removeInfo, null); sendToActiveMQ(removeInfo, null);
// check if the durable sub also needs to be removed
if (subs.getConsumerInfo().getSubscriptionName() != null) {
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
rsi.setConnectionId(connectionId);
rsi.setSubscriptionName(subs.getConsumerInfo().getSubscriptionName());
rsi.setClientId(connectionInfo.getClientId());
sendToActiveMQ(rsi, null);
}
} }
} }