From efa55278ec3a7045240c56bbaa74fcbcff837159 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 30 Jul 2014 20:03:48 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5299 Fix duplicate call to unsubscribe. --- .../transport/mqtt/MQTTProtocolConverter.java | 24 +++++++------------ 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index cc51ce7a89..eb5bb2b7cf 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -151,11 +151,12 @@ public class MQTTProtocolConverter { void sendToActiveMQ(Command command, ResponseHandler handler) { // Lets intercept message send requests.. - if( command instanceof ActiveMQMessage) { + if (command instanceof ActiveMQMessage) { ActiveMQMessage msg = (ActiveMQMessage) command; - if( !getPublishDollarTopics() && msg.getDestination().getPhysicalName().startsWith("$") ) { - // We don't allow users to send to $ prefixed topics to avoid failing MQTT 3.1.1 spec requirements - if( handler!=null ) { + if (!getPublishDollarTopics() && msg.getDestination().getPhysicalName().startsWith("$")) { + // We don't allow users to send to $ prefixed topics to avoid failing MQTT 3.1.1 + // specification requirements + if (handler != null) { try { handler.onResponse(this, new Response()); } catch (IOException e) { @@ -186,7 +187,6 @@ public class MQTTProtocolConverter { * Convert a MQTT command */ public void onMQTTCommand(MQTTFrame frame) throws IOException, JMSException { - switch (frame.messageType()) { case PINGREQ.TYPE: { LOG.debug("Received a ping from client: " + getClientId()); @@ -240,7 +240,6 @@ public class MQTTProtocolConverter { } void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException { - if (connected.get()) { throw new MQTTProtocolException("Already connected."); } @@ -333,7 +332,7 @@ public class MQTTProtocolConverter { getMQTTTransport().sendToMQTT(ack.encode()); List subs = PersistenceAdapterSupport.listSubscriptions(brokerService.getPersistenceAdapter(), connectionInfo.getClientId()); - if( connect.cleanSession() ) { + if (connect.cleanSession()) { packetIdGenerator.stopClientSession(getClientId()); deleteDurableSubs(subs); } else { @@ -417,18 +416,16 @@ public class MQTTProtocolConverter { final QoS topicQoS = topic.qos(); ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topicName)); - if( mqttSubscriptionByTopic.containsKey(topicName)) { + if (mqttSubscriptionByTopic.containsKey(topicName)) { final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(topicName); if (topicQoS != mqttSubscription.qos()) { // remove old subscription as the QoS has changed onUnSubscribe(topicName); } else { - // duplicate SUBSCRIBE packet, find all matching topics and resend retained messages + // duplicate SUBSCRIBE packet, find all matching topics and re-send retained messages resendRetainedMessages(topicName, destination, mqttSubscription); - return (byte) topicQoS.ordinal(); } - onUnSubscribe(topicName); } ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); @@ -438,7 +435,7 @@ public class MQTTProtocolConverter { consumerInfo.setRetroactive(true); consumerInfo.setDispatchAsync(true); // create durable subscriptions only when cleansession is false - if ( !connect.cleanSession() && connect.clientId() != null && topicQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal() ) { + if (!connect.cleanSession() && connect.clientId() != null && topicQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) { consumerInfo.setSubscriptionName(topicQoS + ":" + topicName); } MQTTSubscription mqttSubscription = new MQTTSubscription(this, topicQoS, consumerInfo); @@ -914,7 +911,6 @@ public class MQTTProtocolConverter { char[] chars = name.toCharArray(); for (int i = 0; i < chars.length; i++) { switch(chars[i]) { - case '#': chars[i] = '>'; break; @@ -928,14 +924,12 @@ public class MQTTProtocolConverter { case '*': chars[i] = '+'; break; - case '/': chars[i] = '.'; break; case '.': chars[i] = '/'; break; - } } String rc = new String(chars);