Fix duplicate call to unsubscribe.
This commit is contained in:
Timothy Bish 2014-07-30 20:03:48 -04:00
parent 2cd54248c6
commit efa55278ec
1 changed files with 9 additions and 15 deletions

View File

@ -151,11 +151,12 @@ public class MQTTProtocolConverter {
void sendToActiveMQ(Command command, ResponseHandler handler) { void sendToActiveMQ(Command command, ResponseHandler handler) {
// Lets intercept message send requests.. // Lets intercept message send requests..
if( command instanceof ActiveMQMessage) { if (command instanceof ActiveMQMessage) {
ActiveMQMessage msg = (ActiveMQMessage) command; ActiveMQMessage msg = (ActiveMQMessage) command;
if( !getPublishDollarTopics() && msg.getDestination().getPhysicalName().startsWith("$") ) { if (!getPublishDollarTopics() && msg.getDestination().getPhysicalName().startsWith("$")) {
// We don't allow users to send to $ prefixed topics to avoid failing MQTT 3.1.1 spec requirements // We don't allow users to send to $ prefixed topics to avoid failing MQTT 3.1.1
if( handler!=null ) { // specification requirements
if (handler != null) {
try { try {
handler.onResponse(this, new Response()); handler.onResponse(this, new Response());
} catch (IOException e) { } catch (IOException e) {
@ -186,7 +187,6 @@ public class MQTTProtocolConverter {
* Convert a MQTT command * Convert a MQTT command
*/ */
public void onMQTTCommand(MQTTFrame frame) throws IOException, JMSException { public void onMQTTCommand(MQTTFrame frame) throws IOException, JMSException {
switch (frame.messageType()) { switch (frame.messageType()) {
case PINGREQ.TYPE: { case PINGREQ.TYPE: {
LOG.debug("Received a ping from client: " + getClientId()); LOG.debug("Received a ping from client: " + getClientId());
@ -240,7 +240,6 @@ public class MQTTProtocolConverter {
} }
void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException { void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException {
if (connected.get()) { if (connected.get()) {
throw new MQTTProtocolException("Already connected."); throw new MQTTProtocolException("Already connected.");
} }
@ -333,7 +332,7 @@ public class MQTTProtocolConverter {
getMQTTTransport().sendToMQTT(ack.encode()); getMQTTTransport().sendToMQTT(ack.encode());
List<SubscriptionInfo> subs = PersistenceAdapterSupport.listSubscriptions(brokerService.getPersistenceAdapter(), connectionInfo.getClientId()); List<SubscriptionInfo> subs = PersistenceAdapterSupport.listSubscriptions(brokerService.getPersistenceAdapter(), connectionInfo.getClientId());
if( connect.cleanSession() ) { if (connect.cleanSession()) {
packetIdGenerator.stopClientSession(getClientId()); packetIdGenerator.stopClientSession(getClientId());
deleteDurableSubs(subs); deleteDurableSubs(subs);
} else { } else {
@ -417,18 +416,16 @@ public class MQTTProtocolConverter {
final QoS topicQoS = topic.qos(); final QoS topicQoS = topic.qos();
ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topicName)); ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topicName));
if( mqttSubscriptionByTopic.containsKey(topicName)) { if (mqttSubscriptionByTopic.containsKey(topicName)) {
final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(topicName); final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(topicName);
if (topicQoS != mqttSubscription.qos()) { if (topicQoS != mqttSubscription.qos()) {
// remove old subscription as the QoS has changed // remove old subscription as the QoS has changed
onUnSubscribe(topicName); onUnSubscribe(topicName);
} else { } else {
// duplicate SUBSCRIBE packet, find all matching topics and resend retained messages // duplicate SUBSCRIBE packet, find all matching topics and re-send retained messages
resendRetainedMessages(topicName, destination, mqttSubscription); resendRetainedMessages(topicName, destination, mqttSubscription);
return (byte) topicQoS.ordinal(); return (byte) topicQoS.ordinal();
} }
onUnSubscribe(topicName);
} }
ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
@ -438,7 +435,7 @@ public class MQTTProtocolConverter {
consumerInfo.setRetroactive(true); consumerInfo.setRetroactive(true);
consumerInfo.setDispatchAsync(true); consumerInfo.setDispatchAsync(true);
// create durable subscriptions only when cleansession is false // create durable subscriptions only when cleansession is false
if ( !connect.cleanSession() && connect.clientId() != null && topicQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal() ) { if (!connect.cleanSession() && connect.clientId() != null && topicQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) {
consumerInfo.setSubscriptionName(topicQoS + ":" + topicName); consumerInfo.setSubscriptionName(topicQoS + ":" + topicName);
} }
MQTTSubscription mqttSubscription = new MQTTSubscription(this, topicQoS, consumerInfo); MQTTSubscription mqttSubscription = new MQTTSubscription(this, topicQoS, consumerInfo);
@ -914,7 +911,6 @@ public class MQTTProtocolConverter {
char[] chars = name.toCharArray(); char[] chars = name.toCharArray();
for (int i = 0; i < chars.length; i++) { for (int i = 0; i < chars.length; i++) {
switch(chars[i]) { switch(chars[i]) {
case '#': case '#':
chars[i] = '>'; chars[i] = '>';
break; break;
@ -928,14 +924,12 @@ public class MQTTProtocolConverter {
case '*': case '*':
chars[i] = '+'; chars[i] = '+';
break; break;
case '/': case '/':
chars[i] = '.'; chars[i] = '.';
break; break;
case '.': case '.':
chars[i] = '/'; chars[i] = '/';
break; break;
} }
} }
String rc = new String(chars); String rc = new String(chars);