From 05a93314cd038e0d929109e3adaf044f514fdc5f Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Tue, 17 Sep 2019 09:44:25 +0800 Subject: [PATCH] ARTEMIS-2493 OpenWire session close doesn't cleanup consumer refs When an openwire client closes the session, the broker doesn't clean up its server consumer references even though the core consumers are closed. This results a leak when sessions within a connection are created and closed when the connection keeps open. --- .../protocol/openwire/OpenWireConnection.java | 15 ++++--- .../openwire/OpenWireProtocolManager.java | 4 ++ .../openwire/SimpleOpenWireTest.java | 42 ++++++++++++++++++- 3 files changed, 51 insertions(+), 10 deletions(-) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 6c846ba571..9654a3e9d0 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -1178,16 +1178,15 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se // Don't let new consumers or producers get added while we are closing // this down. session.shutdown(); - // Cascade the connection stop producers. - // we don't stop consumer because in core - // closing the session will do the job + for (ProducerId producerId : session.getProducerIds()) { - try { - processRemoveProducer(producerId); - } catch (Throwable e) { - // LOG.warn("Failed to remove producer: {}", producerId, e); - } + processRemoveProducer(producerId); } + + for (ConsumerId consumerId : session.getConsumerIds()) { + processRemoveConsumer(consumerId, lastDeliveredSequenceId); + } + state.removeSession(id); propagateLastSequenceId(session, lastDeliveredSequenceId); removeSession(context, session.getInfo()); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 44cc8da241..efe7801168 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -682,4 +682,8 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl } return mappedDestination; } + + public List getConnections() { + return connections; + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java index 77efd28694..d2d1d0ce60 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java @@ -47,6 +47,7 @@ import javax.jms.XASession; import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; @@ -64,12 +65,17 @@ import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection; +import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.state.SessionState; import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; @@ -244,8 +250,6 @@ public class SimpleOpenWireTest extends BasicOpenWireTest { Connection connection = factory.createConnection(); - Collection sessions = new LinkedList<>(); - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Queue queue = session.createQueue(queueName); System.out.println("Queue:" + queue); @@ -270,6 +274,40 @@ public class SimpleOpenWireTest extends BasicOpenWireTest { } } + @Test + public void testSessionCloseWithOpenConnection() throws Exception { + try (Connection connection = factory.createConnection()) { + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + + session.createConsumer(queue); + session.createConsumer(queue); + + connection.start(); + + Field infoField = ActiveMQSession.class.getDeclaredField("info"); + infoField.setAccessible(true); + SessionInfo info = (SessionInfo) infoField.get(session); + + NettyAcceptor acceptor = (NettyAcceptor) server.getRemotingService().getAcceptor("netty"); + OpenWireProtocolManager protocolManager = (OpenWireProtocolManager) acceptor.getProtocolMap().get("OPENWIRE"); + + List connections = protocolManager.getConnections(); + assertEquals(1, connections.size()); + + OpenWireConnection conn = connections.get(0); + + SessionState sessionState = conn.getState().getSessionState(info.getSessionId()); + + Wait.assertEquals(2, sessionState.getConsumerIds()::size, 5000); + + session.close(); + + Wait.assertEquals(0, sessionState.getConsumerIds()::size, 5000); + } + } + @Test public void testRollback() throws Exception { try (Connection connection = factory.createConnection()) {