From f63ffc7af5d96fd1a6785a12d5ce3ae504831269 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 25 May 2017 10:33:19 -0400 Subject: [PATCH] ARTEMIS-1175 Fixing Memory Leak --- .../core/protocol/mqtt/MQTTSessionState.java | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java index 74bc101c12..4b5c64820b 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java @@ -41,7 +41,7 @@ public class MQTTSessionState { private final ConcurrentMap> addressMessageMap = new ConcurrentHashMap<>(); - private final Set pubRec = new HashSet<>(); + private final Set pubRec = new HashSet<>(); private boolean attached = false; @@ -128,7 +128,7 @@ public class MQTTSessionState { public class OutboundStore { - private HashMap artemisToMqttMessageMap = new HashMap<>(); + private HashMap, Integer> artemisToMqttMessageMap = new HashMap<>(); private HashMap> mqttToServerIds = new HashMap<>(); @@ -136,9 +136,13 @@ public class MQTTSessionState { private final AtomicInteger ids = new AtomicInteger(0); - public int generateMqttId(long serverId, long consumerId) { + private Pair generateKey(long messageId, long consumerID) { + return new Pair<>(messageId, consumerID); + } + + public int generateMqttId(long messageId, long consumerId) { synchronized (dataStoreLock) { - Integer id = artemisToMqttMessageMap.get(consumerId + ":" + serverId); + Integer id = artemisToMqttMessageMap.get(generateKey(messageId, consumerId)); if (id == null) { ids.compareAndSet(Short.MAX_VALUE, 1); id = ids.addAndGet(1); @@ -147,19 +151,20 @@ public class MQTTSessionState { } } - public void publish(int mqtt, long serverId, long consumerId) { + public void publish(int mqtt, long messageId, long consumerId) { synchronized (dataStoreLock) { - artemisToMqttMessageMap.put(consumerId + ":" + serverId, mqtt); - mqttToServerIds.put(mqtt, new Pair(serverId, consumerId)); + Pair key = generateKey(messageId, consumerId); + artemisToMqttMessageMap.put(key, mqtt); + mqttToServerIds.put(mqtt, key); } } public Pair publishAckd(int mqtt) { synchronized (dataStoreLock) { - Pair p = mqttToServerIds.remove(mqtt); + Pair p = mqttToServerIds.remove(mqtt); if (p != null) { mqttToServerIds.remove(p.getA()); - artemisToMqttMessageMap.remove(p.getB() + ":" + p.getA()); + artemisToMqttMessageMap.remove(p); } return p; }