From 95c4fdd4086ae522f1d2edfc537c06f820d10ac1 Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Fri, 30 Sep 2016 16:36:00 +0100 Subject: [PATCH] ARTEMIS-762 Reflect management changes in AMQP protocol --- .../management/ActiveMQServerControl.java | 7 ++ .../management/impl/JMSTopicControlImpl.java | 2 +- .../amqp/broker/AMQPConnectionCallback.java | 2 +- .../amqp/broker/AMQPSessionCallback.java | 17 ++-- .../ActiveMQProtonRemotingConnection.java | 7 +- .../amqp/proton/AMQPConnectionContext.java | 7 +- .../protocol/amqp/proton/AmqpSupport.java | 3 + .../proton/ProtonServerSenderContext.java | 11 ++- .../amqp/proton/handler/ProtonHandler.java | 17 +++- .../impl/ActiveMQServerControlImpl.java | 10 ++- .../transport/amqp/client/AmqpSession.java | 14 ++- .../tests/integration/amqp/ProtonTest.java | 85 ++++++++++++++++++- .../management/ActiveMQServerControlTest.java | 26 ++++++ .../ActiveMQServerControlUsingCoreTest.java | 5 ++ 14 files changed, 190 insertions(+), 23 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java index 02819e5c0b..01a8d7450a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java @@ -511,6 +511,13 @@ public interface ActiveMQServerControl { @Operation(desc = "Destroy a queue", impact = MBeanOperationInfo.ACTION) void destroyQueue(@Parameter(name = "name", desc = "Name of the queue to destroy") String name) throws Exception; + /** + * Destroys the queue corresponding to the specified name. + */ + @Operation(desc = "Destroy a queue", impact = MBeanOperationInfo.ACTION) + void destroyQueue(@Parameter(name = "name", desc = "Name of the queue to destroy") String name, + @Parameter(name = "removeConsumers", desc = "Remove consumers of this queue") boolean removeConsumers) throws Exception; + /** * Enables message counters for this server. */ diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSTopicControlImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSTopicControlImpl.java index d8c4179baf..cd8e4e05b2 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSTopicControlImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSTopicControlImpl.java @@ -238,7 +238,7 @@ public class JMSTopicControlImpl extends StandardMBean implements TopicControl { throw new IllegalArgumentException("No subscriptions with name " + queueName + " for clientID " + clientID); } ActiveMQServerControl serverControl = (ActiveMQServerControl) managementService.getResource(ResourceNames.CORE_SERVER); - serverControl.destroyQueue(queueName); + serverControl.destroyQueue(queueName, true); } @Override diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java index 31abf87cb4..4ced546ce6 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java @@ -117,7 +117,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener { server.removeClientConnection(remoteContainerId); } connection.close(); - amqpConnection.close(); + amqpConnection.close(null); } finally { for (Transaction tx : transactions.values()) { try { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 46ed1c9429..c7ca44611a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -43,6 +43,7 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternal import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext; +import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext; import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult; import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; @@ -59,12 +60,14 @@ import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Delivery; -import org.apache.qpid.proton.engine.Link; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.message.ProtonJMessage; +import org.jboss.logging.Logger; public class AMQPSessionCallback implements SessionCallback { + private static final Logger logger = Logger.getLogger(AMQPSessionCallback.class); + protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0); private final AMQPConnectionCallback protonSPI; @@ -467,9 +470,14 @@ public class AMQPSessionCallback implements SessionCallback { @Override public void disconnect(ServerConsumer consumer, String queueName) { - synchronized (connection.getLock()) { - ((Link) consumer.getProtocolContext()).close(); - connection.flush(); + ErrorCondition ec = new ErrorCondition(AmqpSupport.RESOURCE_DELETED, "Queue was deleted: " + queueName); + try { + synchronized (connection.getLock()) { + ((ProtonServerSenderContext) consumer.getProtocolContext()).close(ec); + connection.flush(); + } + } catch (ActiveMQAMQPException e) { + logger.error("Error closing link for " + consumer.getQueue().getAddress()); } } @@ -504,5 +512,4 @@ public class AMQPSessionCallback implements SessionCallback { protonSPI.removeTransaction(txid); } - } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java index 7f129a1981..039da79f9b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java @@ -23,8 +23,10 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; +import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; /** * This is a Server's Connection representation used by ActiveMQ Artemis. @@ -103,6 +105,9 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection @Override public void disconnect(boolean criticalError) { + ErrorCondition errorCondition = new ErrorCondition(); + errorCondition.setCondition(AmqpSupport.CONNECTION_FORCED); + amqpConnection.close(errorCondition); getTransportConnection().close(); } @@ -111,7 +116,7 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection */ @Override public void disconnect(String scaleDownNodeID, boolean criticalError) { - getTransportConnection().close(); + disconnect(criticalError); } @Override diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java index 3d79026c9d..70e4fd086b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java @@ -36,6 +36,7 @@ import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.VersionLoader; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transaction.Coordinator; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Link; @@ -132,8 +133,8 @@ public class AMQPConnectionContext extends ProtonInitializable { handler.flush(); } - public void close() { - handler.close(); + public void close(ErrorCondition errorCondition) { + handler.close(errorCondition); } protected AMQPSessionContext getSessionExtension(Session realSession) throws ActiveMQAMQPException { @@ -264,7 +265,7 @@ public class AMQPConnectionContext extends ProtonInitializable { if (!connectionCallback.isSupportsAnonymous()) { connectionCallback.sendSASLSupported(); connectionCallback.close(); - handler.close(); + handler.close(null); } } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java index 13d7170c38..7bdbd2ee01 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java @@ -53,6 +53,9 @@ public class AmqpSupport { public static final Symbol PRODUCT = Symbol.valueOf("product"); public static final Symbol VERSION = Symbol.valueOf("version"); public static final Symbol PLATFORM = Symbol.valueOf("platform"); + public static final Symbol RESOURCE_DELETED = Symbol.valueOf("amqp:resource-deleted"); + public static final Symbol CONNECTION_FORCED = Symbol.valueOf("amqp:connection:forced"); + // Symbols used in configuration of newly opened links. public static final Symbol COPY = Symbol.getSymbol("copy"); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 7ef49449f2..7d401fa06e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -169,7 +169,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // Attempt to recover a previous subscription happens when a link reattach happens on a subscription queue String clientId = connection.getRemoteContainer(); String pubId = sender.getName(); - queue = clientId + ":" + pubId; + queue = createQueueName(clientId, pubId); boolean exists = sessionSPI.queueQuery(queue, false).isExists(); /* @@ -207,7 +207,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) { String clientId = connection.getRemoteContainer(); String pubId = sender.getName(); - queue = clientId + ":" + pubId; + queue = createQueueName(clientId, pubId); QueueQueryResult result = sessionSPI.queueQuery(queue, false); if (result.isExists()) { @@ -307,7 +307,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } else { String clientId = connection.getRemoteContainer(); String pubId = sender.getName(); - String queue = clientId + ":" + pubId; + String queue = createQueueName(clientId, pubId); result = sessionSPI.queueQuery(queue, false); if (result.isExists()) { if (result.getConsumerCount() > 0) { @@ -324,6 +324,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } + @Override public void onMessage(Delivery delivery) throws ActiveMQAMQPException { Object message = delivery.getContext(); @@ -478,4 +479,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } + private static String createQueueName(String clientId, String pubId) { + return clientId + "." + pubId; + } + } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java index 0d667b01cf..3c088d58e7 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java @@ -248,6 +248,10 @@ public class ProtonHandler extends ProtonInitializable { } public void flush() { + flush(false); + } + + private void flush(boolean wait) { synchronized (lock) { transport.process(); @@ -255,14 +259,21 @@ public class ProtonHandler extends ProtonInitializable { } - dispatchExecutor.execute(dispatchRunnable); + if (wait) { + dispatch(); + } else { + dispatchExecutor.execute(dispatchRunnable); + } } - public void close() { + public void close(ErrorCondition errorCondition) { synchronized (lock) { + if (errorCondition != null) { + connection.setCondition(errorCondition); + } connection.close(); } - flush(); + flush(true); } protected void checkServerSASL() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index 48f43748bd..b96fcbfc31 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -697,19 +697,23 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active } @Override - public void destroyQueue(final String name) throws Exception { + public void destroyQueue(final String name, final boolean removeConsumers) throws Exception { checkStarted(); clearIO(); try { SimpleString queueName = new SimpleString(name); - - server.destroyQueue(queueName, null, true); + server.destroyQueue(queueName, null, !removeConsumers, removeConsumers); } finally { blockOnIO(); } } + @Override + public void destroyQueue(final String name) throws Exception { + destroyQueue(name, false); + } + @Override public int getConnectionCount() { checkStarted(); diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index 65a69b780a..936d4ef87c 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -207,10 +207,22 @@ public class AmqpSession extends AmqpAbstractResource { * @throws Exception if an error occurs while creating the receiver. */ public AmqpReceiver createReceiver(Source source) throws Exception { + return createReceiver(source, getNextReceiverId()); + } + + /** + * Create a receiver instance using the given Source + * + * @param source the caller created and configured Source used to create the receiver link. + * @param receiverId the receiver id to use. + * @return a newly created receiver that is ready for use. + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver createReceiver(Source source, String receiverId) throws Exception { checkClosed(); final ClientFuture request = new ClientFuture(); - final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, getNextReceiverId()); + final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, receiverId); connection.getScheduler().execute(new Runnable() { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java index e90a8b180a..2a1e8c9c59 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java @@ -58,7 +58,10 @@ import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait; import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.VersionLoader; import org.apache.activemq.transport.amqp.client.AmqpClient; @@ -71,6 +74,8 @@ import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.TerminusDurability; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -156,6 +161,7 @@ public class ProtonTest extends ProtonTestBase { server.createQueue(new SimpleString("amqp_testtopic" + "8"), new SimpleString("amqp_testtopic" + "8"), null, true, false); server.createQueue(new SimpleString("amqp_testtopic" + "9"), new SimpleString("amqp_testtopic" + "9"), null, true, false); server.createQueue(new SimpleString("amqp_testtopic" + "10"), new SimpleString("amqp_testtopic" + "10"), null, true, false); + connection = createConnection(); } @@ -186,9 +192,9 @@ public class ProtonTest extends ProtonTestBase { session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); myDurSub = session.createDurableSubscriber(topic, "myDurSub"); myDurSub.close(); - Assert.assertNotNull(server.getPostOffice().getBinding(new SimpleString("myClientId:myDurSub"))); + Assert.assertNotNull(server.getPostOffice().getBinding(new SimpleString("myClientId.myDurSub"))); session.unsubscribe("myDurSub"); - Assert.assertNull(server.getPostOffice().getBinding(new SimpleString("myClientId:myDurSub"))); + Assert.assertNull(server.getPostOffice().getBinding(new SimpleString("myClientId.myDurSub"))); session.close(); connection.close(); } finally { @@ -740,6 +746,81 @@ public class ProtonTest extends ProtonTestBase { assertTrue(expectedException.getMessage().contains("target address does not exist")); } + @Test + public void testLinkDetachSentWhenQueueDeleted() throws Exception { + AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); + final AmqpConnection amqpConnection = client.connect(); + try { + AmqpSession session = amqpConnection.createSession(); + + AmqpReceiver receiver = session.createReceiver(coreAddress); + server.destroyQueue(new SimpleString(coreAddress), null, false, true); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return amqpConnection.isClosed(); + } + }); + assertTrue(receiver.isClosed()); + } finally { + amqpConnection.close(); + } + } + + @Test + public void testCloseIsSentOnConnectionClose() throws Exception { + connection.close(); + + AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); + final AmqpConnection amqpConnection = client.connect(); + try { + for (RemotingConnection connection : server.getRemotingService().getConnections()) { + server.getRemotingService().removeConnection(connection); + connection.disconnect(true); + } + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return amqpConnection.isClosed(); + } + }); + + assertTrue(amqpConnection.isClosed()); + assertEquals(AmqpSupport.CONNECTION_FORCED, amqpConnection.getConnection().getRemoteCondition().getCondition()); + } finally { + amqpConnection.close(); + } + } + + + @Test + public void testClientIdIsSetInSubscriptionList() throws Exception { + AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); + AmqpConnection amqpConnection = client.createConnection(); + amqpConnection.setContainerId("testClient"); + amqpConnection.setOfferedCapabilities(Arrays.asList(Symbol.getSymbol("topic"))); + amqpConnection.connect(); + try { + AmqpSession session = amqpConnection.createSession(); + + Source source = new Source(); + source.setDurable(TerminusDurability.UNSETTLED_STATE); + source.setCapabilities(Symbol.getSymbol("topic")); + source.setAddress("jms.topic.mytopic"); + AmqpReceiver receiver = session.createReceiver(source, "testSub"); + + SimpleString fo = new SimpleString("testClient.testSub:jms.topic.mytopic"); + assertNotNull(server.locateQueue(fo)); + + } catch (Exception e) { + e.printStackTrace(); + } finally { + amqpConnection.close(); + } + } + @Test public void testSendingAndReceivingToQueueWithDifferentAddressAndQueueName() throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java index df3cf5789a..86d19db8a5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java @@ -249,6 +249,32 @@ public class ActiveMQServerControlTest extends ManagementTestBase { checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); } + @Test + public void testCreateAndDestroyQueueClosingConsumers() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString name = RandomUtil.randomSimpleString(); + boolean durable = true; + + ActiveMQServerControl serverControl = createManagementControl(); + + checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); + + serverControl.createQueue(address.toString(), name.toString(), durable); + + ServerLocator receiveLocator = createInVMNonHALocator(); + ClientSessionFactory receiveCsf = createSessionFactory(receiveLocator); + ClientSession receiveClientSession = receiveCsf.createSession(true, false, false); + ClientConsumer consumer = receiveClientSession.createConsumer(name); + + Assert.assertFalse(consumer.isClosed()); + + checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); + serverControl.destroyQueue(name.toString(), true); + Assert.assertTrue(consumer.isClosed()); + + checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); + } + @Test public void testCreateAndDestroyQueueWithNullFilter() throws Exception { SimpleString address = RandomUtil.randomSimpleString(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java index 69949b6b3f..777ddd2f1c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java @@ -127,6 +127,11 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes proxy.invokeOperation("destroyQueue", name); } + @Override + public void destroyQueue(final String name, final boolean removeConsumers) throws Exception { + proxy.invokeOperation("destroyQueue", name, removeConsumers); + } + @Override public void disableMessageCounters() throws Exception { proxy.invokeOperation("disableMessageCounters");