From c1fd16d66a506d7878f0da6b71dfeaa096221d8c Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 17 Jun 2022 15:33:14 -0400 Subject: [PATCH] ARTEMIS-3864 StompTransactions leaking on ActiveMQServer.getSessions() After a TX in stomp is committed, a session will never be cleared from ActiveMQServer --- .../protocol/stomp/StompProtocolManager.java | 67 ++++++++++++------- .../core/protocol/stomp/StompSession.java | 14 ++++ .../tests/integration/stomp/StompTest.java | 33 +++++++++ 3 files changed, 89 insertions(+), 25 deletions(-) diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java index 87d5c28ed7..1f9005a1c4 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java @@ -19,10 +19,9 @@ package org.apache.activemq.artemis.core.protocol.stomp; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import io.netty.channel.ChannelPipeline; @@ -63,7 +62,8 @@ public class StompProtocolManager extends AbstractProtocolManager transactedSessions = new HashMap<>(); + // connectionID / Map + private final Map> transactedSessions = new ConcurrentHashMap<>(); // key => connection ID, value => Stomp session private final Map sessions = new HashMap<>(); @@ -218,10 +218,29 @@ public class StompProtocolManager extends AbstractProtocolManager> getTransactedSessions() { + return transactedSessions; + } + + private Map getTXMap(Object objectID) { + Map sessions = transactedSessions.get(objectID); + if (sessions == null) { + sessions = new HashMap<>(); + Map oldValue = transactedSessions.putIfAbsent(objectID, sessions); + if (oldValue != null) { + sessions = oldValue; + } + } + + return sessions; + } + + private StompSession internalGetSession(StompConnection connection, Map sessions, Object id, boolean transacted) throws Exception { + System.out.println("Looking for sessionID " + id); StompSession stompSession = sessions.get(id); if (stompSession == null) { stompSession = new StompSession(connection, this, server.getStorageManager().newContext(server.getExecutorFactory().getExecutor())); @@ -253,21 +272,18 @@ public class StompProtocolManager extends AbstractProtocolManager> iterator = transactedSessions.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - if (entry.getValue().getConnection() == connection) { - ServerSession serverSession = entry.getValue().getCoreSession(); - try { - serverSession.rollback(true); - serverSession.close(false); - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorCleaningStompConn(e); - } - iterator.remove(); + Map sessionMap = getTXMap(connection.getID()); + sessionMap.values().forEach(ss -> { + try { + ss.getCoreSession().rollback(false); + ss.getCoreSession().close(false); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.errorCleaningStompConn(e); } - } + }); + sessionMap.clear(); + + transactedSessions.remove(connection.getID()); } }); } @@ -323,20 +339,20 @@ public class StompProtocolManager extends AbstractProtocolManager server.getSessions().size(), 1000, 100); + conn.disconnect(); + } + + if (connection != null) { + connection.close(); + } + + Wait.assertEquals(0, () -> server.getSessions().size(), 1000, 100); + + Acceptor stompAcceptor = server.getRemotingService().getAcceptors().get("stomp"); + StompProtocolManager stompProtocolManager = (StompProtocolManager) stompAcceptor.getProtocolHandler().getProtocolMap().get("STOMP"); + Assert.assertNotNull(stompProtocolManager); + + Assert.assertEquals(0, stompProtocolManager.getTransactedSessions().size()); + } + @Test public void testIngressTimestamp() throws Exception { server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setEnableIngressTimestamp(true));