mirror of
https://github.com/apache/activemq-artemis.git
synced 2025-02-13 05:26:13 +00:00
ARTEMIS-1175 fix memory leak
(cherry picked from commit b136fed48f756071bbf9542e94ae6448c37881f2) (cherry picked from commit f63ffc7af5d96fd1a6785a12d5ce3ae504831269)
This commit is contained in:
parent
1cbadb08d7
commit
ee3669e422
@ -40,7 +40,7 @@ public class MQTTSessionState {
|
||||
|
||||
private final ConcurrentMap<String, Map<Long, Integer>> addressMessageMap = new ConcurrentHashMap<>();
|
||||
|
||||
private final Set<Integer> pubRec = new HashSet<>();
|
||||
private final Set<Integer> pubRec = new HashSet<>();
|
||||
|
||||
private boolean attached = false;
|
||||
|
||||
@ -127,7 +127,7 @@ public class MQTTSessionState {
|
||||
|
||||
public class OutboundStore {
|
||||
|
||||
private HashMap<String, Integer> artemisToMqttMessageMap = new HashMap<>();
|
||||
private HashMap<Pair<Long, Long>, Integer> artemisToMqttMessageMap = new HashMap<>();
|
||||
|
||||
private HashMap<Integer, Pair<Long, Long>> mqttToServerIds = new HashMap<>();
|
||||
|
||||
@ -135,9 +135,13 @@ public class MQTTSessionState {
|
||||
|
||||
private final AtomicInteger ids = new AtomicInteger(0);
|
||||
|
||||
public int generateMqttId(long serverId, long consumerId) {
|
||||
private Pair<Long, Long> generateKey(long messageId, long consumerID) {
|
||||
return new Pair<>(messageId, consumerID);
|
||||
}
|
||||
|
||||
public int generateMqttId(long messageId, long consumerId) {
|
||||
synchronized (dataStoreLock) {
|
||||
Integer id = artemisToMqttMessageMap.get(consumerId + ":" + serverId);
|
||||
Integer id = artemisToMqttMessageMap.get(generateKey(messageId, consumerId));
|
||||
if (id == null) {
|
||||
ids.compareAndSet(Short.MAX_VALUE, 1);
|
||||
id = ids.addAndGet(1);
|
||||
@ -146,18 +150,20 @@ public class MQTTSessionState {
|
||||
}
|
||||
}
|
||||
|
||||
public void publish(int mqtt, long serverId, long consumerId) {
|
||||
public void publish(int mqtt, long messageId, long consumerId) {
|
||||
synchronized (dataStoreLock) {
|
||||
artemisToMqttMessageMap.put(consumerId + ":" + serverId, mqtt);
|
||||
mqttToServerIds.put(mqtt, new Pair(serverId, consumerId));
|
||||
Pair<Long, Long> key = generateKey(messageId, consumerId);
|
||||
artemisToMqttMessageMap.put(key, mqtt);
|
||||
mqttToServerIds.put(mqtt, key);
|
||||
}
|
||||
}
|
||||
|
||||
public Pair<Long, Long> publishAckd(int mqtt) {
|
||||
synchronized (dataStoreLock) {
|
||||
Pair p = mqttToServerIds.remove(mqtt);
|
||||
Pair p = mqttToServerIds.remove(mqtt);
|
||||
if (p != null) {
|
||||
mqttToServerIds.remove(p.getA());
|
||||
artemisToMqttMessageMap.remove(p);
|
||||
}
|
||||
return p;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user