ARTEMIS-1175 Fixing Memory Leak

This commit is contained in:
Clebert Suconic 2017-05-25 10:33:19 -04:00 committed by Martyn Taylor
parent b136fed48f
commit f63ffc7af5
1 changed files with 14 additions and 9 deletions

View File

@ -41,7 +41,7 @@ public class MQTTSessionState {
private final ConcurrentMap<String, Map<Long, Integer>> addressMessageMap = new ConcurrentHashMap<>(); private final ConcurrentMap<String, Map<Long, Integer>> addressMessageMap = new ConcurrentHashMap<>();
private final Set<Integer> pubRec = new HashSet<>(); private final Set<Integer> pubRec = new HashSet<>();
private boolean attached = false; private boolean attached = false;
@ -128,7 +128,7 @@ public class MQTTSessionState {
public class OutboundStore { public class OutboundStore {
private HashMap<String, Integer> artemisToMqttMessageMap = new HashMap<>(); private HashMap<Pair<Long, Long>, Integer> artemisToMqttMessageMap = new HashMap<>();
private HashMap<Integer, Pair<Long, Long>> mqttToServerIds = new HashMap<>(); private HashMap<Integer, Pair<Long, Long>> mqttToServerIds = new HashMap<>();
@ -136,9 +136,13 @@ public class MQTTSessionState {
private final AtomicInteger ids = new AtomicInteger(0); private final AtomicInteger ids = new AtomicInteger(0);
public int generateMqttId(long serverId, long consumerId) { private Pair<Long, Long> generateKey(long messageId, long consumerID) {
return new Pair<>(messageId, consumerID);
}
public int generateMqttId(long messageId, long consumerId) {
synchronized (dataStoreLock) { synchronized (dataStoreLock) {
Integer id = artemisToMqttMessageMap.get(consumerId + ":" + serverId); Integer id = artemisToMqttMessageMap.get(generateKey(messageId, consumerId));
if (id == null) { if (id == null) {
ids.compareAndSet(Short.MAX_VALUE, 1); ids.compareAndSet(Short.MAX_VALUE, 1);
id = ids.addAndGet(1); id = ids.addAndGet(1);
@ -147,19 +151,20 @@ public class MQTTSessionState {
} }
} }
public void publish(int mqtt, long serverId, long consumerId) { public void publish(int mqtt, long messageId, long consumerId) {
synchronized (dataStoreLock) { synchronized (dataStoreLock) {
artemisToMqttMessageMap.put(consumerId + ":" + serverId, mqtt); Pair<Long, Long> key = generateKey(messageId, consumerId);
mqttToServerIds.put(mqtt, new Pair(serverId, consumerId)); artemisToMqttMessageMap.put(key, mqtt);
mqttToServerIds.put(mqtt, key);
} }
} }
public Pair<Long, Long> publishAckd(int mqtt) { public Pair<Long, Long> publishAckd(int mqtt) {
synchronized (dataStoreLock) { synchronized (dataStoreLock) {
Pair p = mqttToServerIds.remove(mqtt); Pair p = mqttToServerIds.remove(mqtt);
if (p != null) { if (p != null) {
mqttToServerIds.remove(p.getA()); mqttToServerIds.remove(p.getA());
artemisToMqttMessageMap.remove(p.getB() + ":" + p.getA()); artemisToMqttMessageMap.remove(p);
} }
return p; return p;
} }