diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 6c846ba571..9654a3e9d0 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -1178,16 +1178,15 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se // Don't let new consumers or producers get added while we are closing // this down. session.shutdown(); - // Cascade the connection stop producers. - // we don't stop consumer because in core - // closing the session will do the job + for (ProducerId producerId : session.getProducerIds()) { - try { - processRemoveProducer(producerId); - } catch (Throwable e) { - // LOG.warn("Failed to remove producer: {}", producerId, e); - } + processRemoveProducer(producerId); } + + for (ConsumerId consumerId : session.getConsumerIds()) { + processRemoveConsumer(consumerId, lastDeliveredSequenceId); + } + state.removeSession(id); propagateLastSequenceId(session, lastDeliveredSequenceId); removeSession(context, session.getInfo()); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 44cc8da241..efe7801168 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -682,4 +682,8 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl } return mappedDestination; } + + public List getConnections() { + return connections; + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java index 77efd28694..d2d1d0ce60 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java @@ -47,6 +47,7 @@ import javax.jms.XASession; import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; @@ -64,12 +65,17 @@ import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection; +import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.state.SessionState; import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; @@ -244,8 +250,6 @@ public class SimpleOpenWireTest extends BasicOpenWireTest { Connection connection = factory.createConnection(); - Collection sessions = new LinkedList<>(); - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Queue queue = session.createQueue(queueName); System.out.println("Queue:" + queue); @@ -270,6 +274,40 @@ public class SimpleOpenWireTest extends BasicOpenWireTest { } } + @Test + public void testSessionCloseWithOpenConnection() throws Exception { + try (Connection connection = factory.createConnection()) { + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + + session.createConsumer(queue); + session.createConsumer(queue); + + connection.start(); + + Field infoField = ActiveMQSession.class.getDeclaredField("info"); + infoField.setAccessible(true); + SessionInfo info = (SessionInfo) infoField.get(session); + + NettyAcceptor acceptor = (NettyAcceptor) server.getRemotingService().getAcceptor("netty"); + OpenWireProtocolManager protocolManager = (OpenWireProtocolManager) acceptor.getProtocolMap().get("OPENWIRE"); + + List connections = protocolManager.getConnections(); + assertEquals(1, connections.size()); + + OpenWireConnection conn = connections.get(0); + + SessionState sessionState = conn.getState().getSessionState(info.getSessionId()); + + Wait.assertEquals(2, sessionState.getConsumerIds()::size, 5000); + + session.close(); + + Wait.assertEquals(0, sessionState.getConsumerIds()::size, 5000); + } + } + @Test public void testRollback() throws Exception { try (Connection connection = factory.createConnection()) {