mirror of https://github.com/apache/activemq.git
fix for https://issues.apache.org/activemq/browse/AMQ-2209 - deleting queue does not remove messages
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@765141 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0c89959c59
commit
e6790674dc
|
@ -431,6 +431,7 @@ public abstract class BaseDestination implements Destination {
|
|||
|
||||
public void dispose(ConnectionContext context) throws IOException {
|
||||
if (this.store != null) {
|
||||
this.store.removeAllMessages(context);
|
||||
this.store.dispose(context);
|
||||
}
|
||||
this.destinationStatistics.setParent(null);
|
||||
|
|
|
@ -219,7 +219,11 @@ public abstract class CombinationTestSupport extends AutoFailTestSupport {
|
|||
}
|
||||
|
||||
public String getName() {
|
||||
if (options != null) {
|
||||
return getName(false);
|
||||
}
|
||||
|
||||
public String getName(boolean original) {
|
||||
if (options != null && !original) {
|
||||
return super.getName() + " " + options;
|
||||
}
|
||||
return super.getName();
|
||||
|
|
|
@ -25,9 +25,15 @@ import javax.management.MBeanServerInvocationHandler;
|
|||
import javax.management.MalformedObjectNameException;
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import junit.framework.Test;
|
||||
import junit.textui.TestRunner;
|
||||
|
||||
import org.apache.activemq.EmbeddedBrokerTestSupport;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.store.PersistenceAdapter;
|
||||
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
|
||||
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
|
||||
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
|
@ -47,10 +53,15 @@ public class PurgeTest extends EmbeddedBrokerTestSupport {
|
|||
protected boolean transacted;
|
||||
protected int authMode = Session.AUTO_ACKNOWLEDGE;
|
||||
protected int messageCount = 10;
|
||||
public PersistenceAdapter persistenceAdapter;
|
||||
|
||||
public static void main(String[] args) {
|
||||
TestRunner.run(PurgeTest.class);
|
||||
}
|
||||
|
||||
public static Test suite() {
|
||||
return suite(PurgeTest.class);
|
||||
}
|
||||
|
||||
public void testPurge() throws Exception {
|
||||
// Send some messages
|
||||
|
@ -91,6 +102,68 @@ public class PurgeTest extends EmbeddedBrokerTestSupport {
|
|||
count = proxy.getQueueSize();
|
||||
assertEquals("Queue size", count, 0);
|
||||
}
|
||||
|
||||
public void initCombosForTestDelete() {
|
||||
addCombinationValues("persistenceAdapter", new Object[] {new MemoryPersistenceAdapter(), new AMQPersistenceAdapter(), new JDBCPersistenceAdapter()});
|
||||
}
|
||||
|
||||
public void testDelete() throws Exception {
|
||||
// Send some messages
|
||||
connection = connectionFactory.createConnection();
|
||||
connection.setClientID(clientID);
|
||||
connection.start();
|
||||
Session session = connection.createSession(transacted, authMode);
|
||||
destination = createDestination();
|
||||
sendMessages(session, messageCount);
|
||||
|
||||
// Now get the QueueViewMBean and purge
|
||||
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
|
||||
QueueViewMBean queueProxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
|
||||
|
||||
ObjectName brokerViewMBeanName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
|
||||
BrokerViewMBean brokerProxy = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerViewMBeanName, BrokerViewMBean.class, true);
|
||||
|
||||
long count = queueProxy.getQueueSize();
|
||||
assertEquals("Queue size", count, messageCount);
|
||||
|
||||
brokerProxy.removeQueue(getDestinationString());
|
||||
|
||||
sendMessages(session, messageCount);
|
||||
|
||||
queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
|
||||
queueProxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
|
||||
|
||||
count = queueProxy.getQueueSize();
|
||||
assertEquals("Queue size", count, messageCount);
|
||||
|
||||
queueProxy.purge();
|
||||
|
||||
// Queues have a special case once there are more than a thousand
|
||||
// dead messages, make sure we hit that.
|
||||
messageCount += 1000;
|
||||
sendMessages(session, messageCount);
|
||||
|
||||
count = queueProxy.getQueueSize();
|
||||
assertEquals("Queue size", count, messageCount);
|
||||
|
||||
brokerProxy.removeQueue(getDestinationString());
|
||||
|
||||
sendMessages(session, messageCount);
|
||||
|
||||
queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
|
||||
queueProxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
|
||||
|
||||
count = queueProxy.getQueueSize();
|
||||
assertEquals("Queue size", count, messageCount);
|
||||
}
|
||||
|
||||
private void sendMessages(Session session, int count) throws Exception {
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
for (int i = 0; i < messageCount; i++) {
|
||||
Message message = session.createTextMessage("Message: " + i);
|
||||
producer.send(message);
|
||||
}
|
||||
}
|
||||
|
||||
protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException, NullPointerException {
|
||||
ObjectName objectName = new ObjectName(name);
|
||||
|
@ -121,12 +194,20 @@ public class PurgeTest extends EmbeddedBrokerTestSupport {
|
|||
BrokerService answer = new BrokerService();
|
||||
answer.setUseJmx(true);
|
||||
answer.setEnableStatistics(true);
|
||||
answer.setPersistent(false);
|
||||
answer.addConnector(bindAddress);
|
||||
answer.setPersistenceAdapter(persistenceAdapter);
|
||||
answer.deleteAllMessages();
|
||||
return answer;
|
||||
}
|
||||
|
||||
protected void echo(String text) {
|
||||
LOG.info(text);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the name of the destination used in this test case
|
||||
*/
|
||||
protected String getDestinationString() {
|
||||
return getClass().getName() + "." + getName(true);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue