From 9f513f88781667df1a2a4b0a85153059dc097295 Mon Sep 17 00:00:00 2001 From: hkesler Date: Sat, 12 Jan 2019 22:47:16 -0500 Subject: [PATCH] AMQ7135 : do a purge before deleting the destination --- .../broker/region/AbstractRegion.java | 3 ++ .../activemq/RemoveDestinationTest.java | 41 +++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index a18e7939ae..8dcb76ca1c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -297,6 +297,9 @@ public abstract class AbstractRegion implements Region { } } destinationMap.unsynchronizedRemove(destination, dest); + if (dest instanceof Queue){ + ((Queue) dest).purge(); + } dispose(context, dest); DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor(); if (destinationInterceptor != null) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java index 632294cd2b..80d202ead5 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java @@ -26,6 +26,7 @@ import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; @@ -35,6 +36,7 @@ import org.apache.activemq.advisory.DestinationSource; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.DestinationViewMBean; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.util.Wait; import org.junit.After; @@ -74,6 +76,45 @@ public class RemoveDestinationTest { return conn; } + @Test(timeout = 60000) + public void testRemoveQueue() throws Exception { + + ActiveMQConnection amqConnection = (ActiveMQConnection) createConnection(true); + + final DestinationSource destinationSource = amqConnection.getDestinationSource(); + Session session = amqConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("TEST.FOO"); + MessageProducer producer = session.createProducer(queue); + MessageConsumer consumer = session.createConsumer(queue); + + TextMessage msg = session.createTextMessage("Hellow World"); + producer.send(msg); + assertNotNull(consumer.receive(5000)); + final ActiveMQQueue amqQueue = (ActiveMQQueue) queue; + + consumer.close(); + producer.close(); + session.close(); + + assertTrue("Destination discovered", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return destinationSource.getQueues().contains(amqQueue); + } + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100))); + + amqConnection.destroyDestination((ActiveMQDestination) queue); + + assertTrue("Destination is removed", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return !destinationSource.getQueues().contains(amqQueue); + } + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100))); + } + @Test(timeout = 60000) public void testRemoveDestinationWithoutSubscriber() throws Exception {