Reduce memory overhead of the MQTT Protocol converter by not storing the
UTF8Buffer instances and instead simply store the needed String values.
We always access the String value anyway so all of the UTF8Buffer object
we store carry the overhead of both marshalled UTF-8 bytes and an
unmarshalled String object.
This commit is contained in:
Timothy Bish 2014-07-30 12:00:23 -04:00
parent 4b0e3e57ba
commit 2cd54248c6
1 changed files with 17 additions and 16 deletions

View File

@ -112,9 +112,9 @@ public class MQTTProtocolConverter {
private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>(); private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
private final ConcurrentHashMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, MQTTSubscription>(); private final ConcurrentHashMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, MQTTSubscription>();
private final ConcurrentHashMap<UTF8Buffer, MQTTSubscription> mqttSubscriptionByTopic = new ConcurrentHashMap<UTF8Buffer, MQTTSubscription>(); private final ConcurrentHashMap<String, MQTTSubscription> mqttSubscriptionByTopic = new ConcurrentHashMap<String, MQTTSubscription>();
private final Map<UTF8Buffer, ActiveMQTopic> activeMQTopicMap = new LRUCache<UTF8Buffer, ActiveMQTopic>(DEFAULT_CACHE_SIZE); private final Map<String, ActiveMQTopic> activeMQTopicMap = new LRUCache<String, ActiveMQTopic>(DEFAULT_CACHE_SIZE);
private final Map<Destination, UTF8Buffer> mqttTopicMap = new LRUCache<Destination, UTF8Buffer>(DEFAULT_CACHE_SIZE); private final Map<Destination, String> mqttTopicMap = new LRUCache<Destination, String>(DEFAULT_CACHE_SIZE);
private final Set<String> restoredSubs = Collections.synchronizedSet(new HashSet<String>()); private final Set<String> restoredSubs = Collections.synchronizedSet(new HashSet<String>());
private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE); private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE);
@ -413,9 +413,9 @@ public class MQTTProtocolConverter {
byte onSubscribe(final Topic topic) throws MQTTProtocolException { byte onSubscribe(final Topic topic) throws MQTTProtocolException {
final UTF8Buffer topicName = topic.name(); final String topicName = topic.name().toString();
final QoS topicQoS = topic.qos(); final QoS topicQoS = topic.qos();
ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topicName.toString())); ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topicName));
if( mqttSubscriptionByTopic.containsKey(topicName)) { if( mqttSubscriptionByTopic.containsKey(topicName)) {
final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(topicName); final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(topicName);
@ -439,7 +439,7 @@ public class MQTTProtocolConverter {
consumerInfo.setDispatchAsync(true); consumerInfo.setDispatchAsync(true);
// create durable subscriptions only when cleansession is false // create durable subscriptions only when cleansession is false
if ( !connect.cleanSession() && connect.clientId() != null && topicQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal() ) { if ( !connect.cleanSession() && connect.clientId() != null && topicQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal() ) {
consumerInfo.setSubscriptionName(topicQoS + ":" + topicName.toString()); consumerInfo.setSubscriptionName(topicQoS + ":" + topicName);
} }
MQTTSubscription mqttSubscription = new MQTTSubscription(this, topicQoS, consumerInfo); MQTTSubscription mqttSubscription = new MQTTSubscription(this, topicQoS, consumerInfo);
@ -471,7 +471,7 @@ public class MQTTProtocolConverter {
return qos[0]; return qos[0];
} }
private void resendRetainedMessages(UTF8Buffer topicName, ActiveMQDestination destination, private void resendRetainedMessages(String topicName, ActiveMQDestination destination,
MQTTSubscription mqttSubscription) throws MQTTProtocolException { MQTTSubscription mqttSubscription) throws MQTTProtocolException {
// check whether the Topic has been recovered in restoreDurableSubs // check whether the Topic has been recovered in restoreDurableSubs
// mark subscription available for recovery for duplicate subscription // mark subscription available for recovery for duplicate subscription
@ -524,7 +524,7 @@ public class MQTTProtocolConverter {
UTF8Buffer[] topics = command.topics(); UTF8Buffer[] topics = command.topics();
if (topics != null) { if (topics != null) {
for (UTF8Buffer topic : topics) { for (UTF8Buffer topic : topics) {
onUnSubscribe(topic); onUnSubscribe(topic.toString());
} }
} }
UNSUBACK ack = new UNSUBACK(); UNSUBACK ack = new UNSUBACK();
@ -532,7 +532,7 @@ public class MQTTProtocolConverter {
sendToMQTT(ack.encode()); sendToMQTT(ack.encode());
} }
void onUnSubscribe(UTF8Buffer topicName) { void onUnSubscribe(String topicName) {
MQTTSubscription subs = mqttSubscriptionByTopic.remove(topicName); MQTTSubscription subs = mqttSubscriptionByTopic.remove(topicName);
if (subs != null) { if (subs != null) {
ConsumerInfo info = subs.getConsumerInfo(); ConsumerInfo info = subs.getConsumerInfo();
@ -548,7 +548,7 @@ public class MQTTProtocolConverter {
// check if the durable sub also needs to be removed // check if the durable sub also needs to be removed
if (subs.getConsumerInfo().getSubscriptionName() != null) { if (subs.getConsumerInfo().getSubscriptionName() != null) {
// also remove it from restored durable subscriptions set // also remove it from restored durable subscriptions set
restoredSubs.remove(convertMQTTToActiveMQ(topicName.toString())); restoredSubs.remove(convertMQTTToActiveMQ(topicName));
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
rsi.setConnectionId(connectionId); rsi.setConnectionId(connectionId);
@ -680,7 +680,7 @@ public class MQTTProtocolConverter {
if (topic == null) { if (topic == null) {
String topicName = convertMQTTToActiveMQ(command.topicName().toString()); String topicName = convertMQTTToActiveMQ(command.topicName().toString());
topic = new ActiveMQTopic(topicName); topic = new ActiveMQTopic(topicName);
activeMQTopicMap.put(command.topicName(), topic); activeMQTopicMap.put(command.topicName().toString(), topic);
} }
} }
msg.setJMSDestination(topic); msg.setJMSDestination(topic);
@ -704,15 +704,15 @@ public class MQTTProtocolConverter {
result.retain(true); result.retain(true);
} }
UTF8Buffer topicName; String topicName;
synchronized (mqttTopicMap) { synchronized (mqttTopicMap) {
topicName = mqttTopicMap.get(message.getJMSDestination()); topicName = mqttTopicMap.get(message.getJMSDestination());
if (topicName == null) { if (topicName == null) {
topicName = new UTF8Buffer(convertActiveMQToMQTT(message.getDestination().getPhysicalName())); topicName = convertActiveMQToMQTT(message.getDestination().getPhysicalName());
mqttTopicMap.put(message.getJMSDestination(), topicName); mqttTopicMap.put(message.getJMSDestination(), topicName);
} }
} }
result.topicName(topicName); result.topicName(new UTF8Buffer(topicName));
if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) { if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy(); ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy();
@ -962,9 +962,10 @@ public class MQTTProtocolConverter {
/** /**
* set the default prefetch size when mapping the MQTT subscription to an ActiveMQ one * set the default prefetch size when mapping the MQTT subscription to an ActiveMQ one
* The default = 1 * The default = 1
* @param activeMQSubscriptionPrefetch set the prefetch for the corresponding ActiveMQ subscription *
* @param activeMQSubscriptionPrefetch
* set the prefetch for the corresponding ActiveMQ subscription
*/ */
public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) { public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) {
this.activeMQSubscriptionPrefetch = activeMQSubscriptionPrefetch; this.activeMQSubscriptionPrefetch = activeMQSubscriptionPrefetch;
} }