AMQ7135 : do a purge before deleting the destination

This commit is contained in:
hkesler 2019-01-12 22:47:16 -05:00
parent 5acd9303a5
commit 9f513f8878
2 changed files with 44 additions and 0 deletions

View File

@ -297,6 +297,9 @@ public abstract class AbstractRegion implements Region {
} }
} }
destinationMap.unsynchronizedRemove(destination, dest); destinationMap.unsynchronizedRemove(destination, dest);
if (dest instanceof Queue){
((Queue) dest).purge();
}
dispose(context, dest); dispose(context, dest);
DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor(); DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
if (destinationInterceptor != null) { if (destinationInterceptor != null) {

View File

@ -26,6 +26,7 @@ import javax.jms.Connection;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import javax.jms.Topic; import javax.jms.Topic;
@ -35,6 +36,7 @@ import org.apache.activemq.advisory.DestinationSource;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.DestinationViewMBean; import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.junit.After; import org.junit.After;
@ -74,6 +76,45 @@ public class RemoveDestinationTest {
return conn; return conn;
} }
@Test(timeout = 60000)
public void testRemoveQueue() throws Exception {
ActiveMQConnection amqConnection = (ActiveMQConnection) createConnection(true);
final DestinationSource destinationSource = amqConnection.getDestinationSource();
Session session = amqConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("TEST.FOO");
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
TextMessage msg = session.createTextMessage("Hellow World");
producer.send(msg);
assertNotNull(consumer.receive(5000));
final ActiveMQQueue amqQueue = (ActiveMQQueue) queue;
consumer.close();
producer.close();
session.close();
assertTrue("Destination discovered", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return destinationSource.getQueues().contains(amqQueue);
}
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100)));
amqConnection.destroyDestination((ActiveMQDestination) queue);
assertTrue("Destination is removed", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return !destinationSource.getQueues().contains(amqQueue);
}
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100)));
}
@Test(timeout = 60000) @Test(timeout = 60000)
public void testRemoveDestinationWithoutSubscriber() throws Exception { public void testRemoveDestinationWithoutSubscriber() throws Exception {