diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index 3e645561e6..cc51ce7a89 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -112,9 +112,9 @@ public class MQTTProtocolConverter { private final ConcurrentHashMap resposeHandlers = new ConcurrentHashMap(); private final ConcurrentHashMap subscriptionsByConsumerId = new ConcurrentHashMap(); - private final ConcurrentHashMap mqttSubscriptionByTopic = new ConcurrentHashMap(); - private final Map activeMQTopicMap = new LRUCache(DEFAULT_CACHE_SIZE); - private final Map mqttTopicMap = new LRUCache(DEFAULT_CACHE_SIZE); + private final ConcurrentHashMap mqttSubscriptionByTopic = new ConcurrentHashMap(); + private final Map activeMQTopicMap = new LRUCache(DEFAULT_CACHE_SIZE); + private final Map mqttTopicMap = new LRUCache(DEFAULT_CACHE_SIZE); private final Set restoredSubs = Collections.synchronizedSet(new HashSet()); private final Map consumerAcks = new LRUCache(DEFAULT_CACHE_SIZE); @@ -413,9 +413,9 @@ public class MQTTProtocolConverter { byte onSubscribe(final Topic topic) throws MQTTProtocolException { - final UTF8Buffer topicName = topic.name(); + final String topicName = topic.name().toString(); final QoS topicQoS = topic.qos(); - ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topicName.toString())); + ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topicName)); if( mqttSubscriptionByTopic.containsKey(topicName)) { final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(topicName); @@ -439,7 +439,7 @@ public class MQTTProtocolConverter { consumerInfo.setDispatchAsync(true); // create durable subscriptions only when cleansession is false if ( !connect.cleanSession() && connect.clientId() != null && topicQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal() ) { - consumerInfo.setSubscriptionName(topicQoS + ":" + topicName.toString()); + consumerInfo.setSubscriptionName(topicQoS + ":" + topicName); } MQTTSubscription mqttSubscription = new MQTTSubscription(this, topicQoS, consumerInfo); @@ -471,7 +471,7 @@ public class MQTTProtocolConverter { return qos[0]; } - private void resendRetainedMessages(UTF8Buffer topicName, ActiveMQDestination destination, + private void resendRetainedMessages(String topicName, ActiveMQDestination destination, MQTTSubscription mqttSubscription) throws MQTTProtocolException { // check whether the Topic has been recovered in restoreDurableSubs // mark subscription available for recovery for duplicate subscription @@ -524,7 +524,7 @@ public class MQTTProtocolConverter { UTF8Buffer[] topics = command.topics(); if (topics != null) { for (UTF8Buffer topic : topics) { - onUnSubscribe(topic); + onUnSubscribe(topic.toString()); } } UNSUBACK ack = new UNSUBACK(); @@ -532,7 +532,7 @@ public class MQTTProtocolConverter { sendToMQTT(ack.encode()); } - void onUnSubscribe(UTF8Buffer topicName) { + void onUnSubscribe(String topicName) { MQTTSubscription subs = mqttSubscriptionByTopic.remove(topicName); if (subs != null) { ConsumerInfo info = subs.getConsumerInfo(); @@ -548,7 +548,7 @@ public class MQTTProtocolConverter { // check if the durable sub also needs to be removed if (subs.getConsumerInfo().getSubscriptionName() != null) { // also remove it from restored durable subscriptions set - restoredSubs.remove(convertMQTTToActiveMQ(topicName.toString())); + restoredSubs.remove(convertMQTTToActiveMQ(topicName)); RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); rsi.setConnectionId(connectionId); @@ -680,7 +680,7 @@ public class MQTTProtocolConverter { if (topic == null) { String topicName = convertMQTTToActiveMQ(command.topicName().toString()); topic = new ActiveMQTopic(topicName); - activeMQTopicMap.put(command.topicName(), topic); + activeMQTopicMap.put(command.topicName().toString(), topic); } } msg.setJMSDestination(topic); @@ -704,15 +704,15 @@ public class MQTTProtocolConverter { result.retain(true); } - UTF8Buffer topicName; + String topicName; synchronized (mqttTopicMap) { topicName = mqttTopicMap.get(message.getJMSDestination()); if (topicName == null) { - topicName = new UTF8Buffer(convertActiveMQToMQTT(message.getDestination().getPhysicalName())); + topicName = convertActiveMQToMQTT(message.getDestination().getPhysicalName()); mqttTopicMap.put(message.getJMSDestination(), topicName); } } - result.topicName(topicName); + result.topicName(new UTF8Buffer(topicName)); if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) { ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy(); @@ -962,9 +962,10 @@ public class MQTTProtocolConverter { /** * set the default prefetch size when mapping the MQTT subscription to an ActiveMQ one * The default = 1 - * @param activeMQSubscriptionPrefetch set the prefetch for the corresponding ActiveMQ subscription + * + * @param activeMQSubscriptionPrefetch + * set the prefetch for the corresponding ActiveMQ subscription */ - public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) { this.activeMQSubscriptionPrefetch = activeMQSubscriptionPrefetch; }