diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index b5b5a96765..a235a53689 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -1620,7 +1620,6 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active Set serverConsumers = session.getServerConsumers(); for (ServerConsumer serverConsumer : serverConsumers) { if (serverConsumer.sequentialID() == Long.valueOf(ID)) { - serverConsumer.close(true); serverConsumer.disconnect(); return true; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java index fde247c7f1..8c2b6c25c1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java @@ -16,6 +16,10 @@ */ package org.apache.activemq.artemis.tests.integration.management; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.Session; import javax.json.JsonArray; import javax.json.JsonObject; import javax.transaction.xa.XAResource; @@ -25,6 +29,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.UUID; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; @@ -46,6 +51,8 @@ import org.apache.activemq.artemis.api.core.management.DivertControl; import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.api.core.management.RoleInfo; +import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; +import org.apache.activemq.artemis.api.jms.JMSFactoryType; import org.apache.activemq.artemis.core.client.impl.ClientSessionImpl; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration; @@ -63,6 +70,7 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.jlibaio.LibaioContext; +import org.apache.activemq.artemis.jms.client.ActiveMQSession; import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; @@ -1981,7 +1989,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase { } @Test - public void testCloseConsumer() throws Exception { + public void testCloseCOREclient() throws Exception { SimpleString address = RandomUtil.randomSimpleString(); SimpleString name = RandomUtil.randomSimpleString(); boolean durable = true; @@ -1995,21 +2003,49 @@ public class ActiveMQServerControlTest extends ManagementTestBase { ServerLocator receiveLocator = createInVMNonHALocator(); ClientSessionFactory receiveCsf = createSessionFactory(receiveLocator); ClientSession receiveClientSession = receiveCsf.createSession(true, false, false); - final ClientConsumer consumer = receiveClientSession.createConsumer(name); - final ClientProducer producer = receiveClientSession.createProducer(name); + final ClientConsumer COREclient = receiveClientSession.createConsumer(name); ServerSession ss = server.getSessions().iterator().next(); ServerConsumer sc = ss.getServerConsumers().iterator().next(); - producer.send(receiveClientSession.createMessage(true)); - consumer.receive(1000); - - Assert.assertFalse(consumer.isClosed()); + Assert.assertFalse(COREclient.isClosed()); serverControl.closeConsumerWithID(((ClientSessionImpl)receiveClientSession).getName(), Long.toString(sc.sequentialID())); - Wait.waitFor(() -> consumer.isClosed(), 1000, 100); - Assert.assertTrue(consumer.isClosed()); + Wait.waitFor(() -> COREclient.isClosed()); + Assert.assertTrue(COREclient.isClosed()); } + @Test + public void testCloseJMSclient() throws Exception { + ActiveMQServerControl serverControl = createManagementControl(); + ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY)); + Connection conn = cf.createConnection(); + conn.start(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Topic topic = ActiveMQJMSClient.createTopic("ConsumerTestTopic"); + + MessageConsumer JMSclient = session.createConsumer(topic, "test1"); + + + long clientID = -1; + String sessionID = ((ClientSessionImpl)(((ActiveMQSession)session).getCoreSession())).getName(); + + Set sessions = server.getSessions(); + for (ServerSession sess : sessions) { + if (sess.getName().equals(sessionID.toString())) { + Set serverConsumers = sess.getServerConsumers(); + for (ServerConsumer serverConsumer : serverConsumers) { + clientID = serverConsumer.sequentialID(); + } + } + } + + Assert.assertFalse(((org.apache.activemq.artemis.jms.client.ActiveMQMessageConsumer)JMSclient).isClosed()); + serverControl.closeConsumerWithID(sessionID, Long.toString(clientID)); + Wait.waitFor(() -> ((org.apache.activemq.artemis.jms.client.ActiveMQMessageConsumer)JMSclient).isClosed()); + Assert.assertTrue(((org.apache.activemq.artemis.jms.client.ActiveMQMessageConsumer)JMSclient).isClosed()); + } + + protected void scaleDown(ScaleDownHandler handler) throws Exception { SimpleString address = new SimpleString("testQueue"); HashMap params = new HashMap<>();