ARTEMIS-3864 StompTransactions leaking on ActiveMQServer.getSessions()

After a TX in stomp is committed, a session will never be cleared from ActiveMQServer
This commit is contained in:
Clebert Suconic 2022-06-17 15:33:14 -04:00 committed by clebertsuconic
parent 9dd026118e
commit c1fd16d66a
3 changed files with 89 additions and 25 deletions

View File

@ -19,10 +19,9 @@ package org.apache.activemq.artemis.core.protocol.stomp;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
@ -63,7 +62,8 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
private final Executor executor; private final Executor executor;
private final Map<Object, StompSession> transactedSessions = new HashMap<>(); // connectionID / Map<SessionId, StompSession>
private final Map<Object, Map<Object, StompSession>> transactedSessions = new ConcurrentHashMap<>();
// key => connection ID, value => Stomp session // key => connection ID, value => Stomp session
private final Map<Object, StompSession> sessions = new HashMap<>(); private final Map<Object, StompSession> sessions = new HashMap<>();
@ -218,10 +218,29 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
} }
public StompSession getTransactedSession(StompConnection connection, String txID) throws Exception { public StompSession getTransactedSession(StompConnection connection, String txID) throws Exception {
return internalGetSession(connection, transactedSessions, txID, true); return internalGetSession(connection, getTXMap(connection.getID()), txID, true);
} }
public Map<Object, Map<Object, StompSession>> getTransactedSessions() {
return transactedSessions;
}
private Map<Object, StompSession> getTXMap(Object objectID) {
Map<Object, StompSession> sessions = transactedSessions.get(objectID);
if (sessions == null) {
sessions = new HashMap<>();
Map<Object, StompSession> oldValue = transactedSessions.putIfAbsent(objectID, sessions);
if (oldValue != null) {
sessions = oldValue;
}
}
return sessions;
}
private StompSession internalGetSession(StompConnection connection, Map<Object, StompSession> sessions, Object id, boolean transacted) throws Exception { private StompSession internalGetSession(StompConnection connection, Map<Object, StompSession> sessions, Object id, boolean transacted) throws Exception {
System.out.println("Looking for sessionID " + id);
StompSession stompSession = sessions.get(id); StompSession stompSession = sessions.get(id);
if (stompSession == null) { if (stompSession == null) {
stompSession = new StompSession(connection, this, server.getStorageManager().newContext(server.getExecutorFactory().getExecutor())); stompSession = new StompSession(connection, this, server.getStorageManager().newContext(server.getExecutorFactory().getExecutor()));
@ -253,21 +272,18 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
} }
} }
// removed the transacted session belonging to the connection Map<Object, StompSession> sessionMap = getTXMap(connection.getID());
Iterator<Entry<Object, StompSession>> iterator = transactedSessions.entrySet().iterator(); sessionMap.values().forEach(ss -> {
while (iterator.hasNext()) { try {
Map.Entry<Object, StompSession> entry = iterator.next(); ss.getCoreSession().rollback(false);
if (entry.getValue().getConnection() == connection) { ss.getCoreSession().close(false);
ServerSession serverSession = entry.getValue().getCoreSession(); } catch (Exception e) {
try { ActiveMQServerLogger.LOGGER.errorCleaningStompConn(e);
serverSession.rollback(true);
serverSession.close(false);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorCleaningStompConn(e);
}
iterator.remove();
} }
} });
sessionMap.clear();
transactedSessions.remove(connection.getID());
} }
}); });
} }
@ -323,20 +339,20 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
public void commitTransaction(StompConnection connection, String txID) throws Exception { public void commitTransaction(StompConnection connection, String txID) throws Exception {
StompSession session = getTransactedSession(connection, txID); StompSession session = getTransactedSession(connection, txID);
if (session == null) { if (session == null || !session.isTxPending()) {
throw new ActiveMQStompException(connection, "No transaction started: " + txID); throw new ActiveMQStompException(connection, "No transaction started: " + txID);
} }
transactedSessions.remove(txID);
session.getCoreSession().commit(); session.getCoreSession().commit();
session.end();
} }
public void abortTransaction(StompConnection connection, String txID) throws Exception { public void abortTransaction(StompConnection connection, String txID) throws Exception {
StompSession session = getTransactedSession(connection, txID); StompSession session = getTransactedSession(connection, txID);
if (session == null) { if (session == null || !session.isTxPending()) {
throw new ActiveMQStompException(connection, "No transaction started: " + txID); throw new ActiveMQStompException(connection, "No transaction started: " + txID);
} }
transactedSessions.remove(txID);
session.getCoreSession().rollback(false); session.getCoreSession().rollback(false);
session.end();
} }
public StompPostReceiptFunction subscribe(StompConnection connection, public StompPostReceiptFunction subscribe(StompConnection connection,
@ -374,12 +390,13 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
public void beginTransaction(StompConnection connection, String txID) throws Exception { public void beginTransaction(StompConnection connection, String txID) throws Exception {
ActiveMQServerLogger.LOGGER.debugf("-------------------------------Stomp begin tx: %s", txID); ActiveMQServerLogger.LOGGER.debugf("-------------------------------Stomp begin tx: %s", txID);
if (transactedSessions.containsKey(txID)) { // create the transacted session
StompSession session = getTransactedSession(connection, txID);
if (session.isTxPending()) {
ActiveMQServerLogger.LOGGER.stompErrorTXExists(txID); ActiveMQServerLogger.LOGGER.stompErrorTXExists(txID);
throw new ActiveMQStompException(connection, "Transaction already started: " + txID); throw new ActiveMQStompException(connection, "Transaction already started: " + txID);
} }
// create the transacted session session.begin();
getTransactedSession(connection, txID);
} }
public boolean destinationExists(String destination) { public boolean destinationExists(String destination) {

View File

@ -72,6 +72,20 @@ public class StompSession implements SessionCallback {
private volatile boolean noLocal = false; private volatile boolean noLocal = false;
private boolean txPending = false;
public synchronized void begin() {
txPending = true;
}
public synchronized boolean isTxPending() {
return txPending;
}
public synchronized void end() {
txPending = false;
}
StompSession(final StompConnection connection, final StompProtocolManager manager, OperationContext sessionContext) { StompSession(final StompConnection connection, final StompProtocolManager manager, OperationContext sessionContext) {
this.connection = connection; this.connection = connection;
this.manager = manager; this.manager = manager;

View File

@ -51,6 +51,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp; import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManager;
import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory; import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
@ -60,6 +61,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQMessage; import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.tests.integration.mqtt.FuseMQTTClientProvider; import org.apache.activemq.artemis.tests.integration.mqtt.FuseMQTTClientProvider;
import org.apache.activemq.artemis.tests.integration.mqtt.MQTTClientProvider; import org.apache.activemq.artemis.tests.integration.mqtt.MQTTClientProvider;
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
@ -662,6 +664,37 @@ public class StompTest extends StompTestBase {
} }
@Test
public void testTransactedSessionLeak() throws Exception {
for (int i = 0; i < 10; i++) {
conn = StompClientConnectionFactory.createClientConnection(uri);
conn.connect(defUser, defPass);
for (int s = 0; s < 10; s++) {
String txId = "tx" + i + "_" + s;
beginTransaction(conn, txId);
send(conn, getQueuePrefix() + getQueueName(), null, "Hello World", true, null, txId);
commitTransaction(conn, txId, true);
}
Wait.assertEquals(13, () -> server.getSessions().size(), 1000, 100);
conn.disconnect();
}
if (connection != null) {
connection.close();
}
Wait.assertEquals(0, () -> server.getSessions().size(), 1000, 100);
Acceptor stompAcceptor = server.getRemotingService().getAcceptors().get("stomp");
StompProtocolManager stompProtocolManager = (StompProtocolManager) stompAcceptor.getProtocolHandler().getProtocolMap().get("STOMP");
Assert.assertNotNull(stompProtocolManager);
Assert.assertEquals(0, stompProtocolManager.getTransactedSessions().size());
}
@Test @Test
public void testIngressTimestamp() throws Exception { public void testIngressTimestamp() throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setEnableIngressTimestamp(true)); server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setEnableIngressTimestamp(true));