[AMQ-6703] have jmx purge clear the audit such that messages can be copied back, fix and test

This commit is contained in:
gtully 2017-06-15 10:46:23 +01:00
parent 2490c85fc5
commit 99f3d4c505
2 changed files with 41 additions and 0 deletions

View File

@ -1280,6 +1280,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
// store
} while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0);
getMessages().getMessageAudit().clear();
if (this.destinationStatistics.getMessages().getCount() > 0) {
LOG.warn("{} after purge of {} messages, message count stats report: {}", getActiveMQDestination().getQualifiedName(), originalMessageCount, this.destinationStatistics.getMessages().getCount());
}

View File

@ -438,6 +438,45 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
}
public void testCopyPurgeCopyBack() throws Exception {
connection = connectionFactory.createConnection();
final int numMessages = 100;
useConnection(connection, numMessages);
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
QueueViewMBean queueT = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
String newDestination = getSecondDestinationString();
long queueSize = queueT.getQueueSize();
assertTrue(queueSize > 0);
int c = queueT.copyMatchingMessagesTo(null, newDestination);
LOG.info("Copied: " + c);
queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + newDestination);
QueueViewMBean queueD = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
LOG.info("Queue: " + queueD.getName() + " now has: " + queueD.getQueueSize() + " message(s)");
assertEquals("Expected messages in a queue: " + queueD.getQueueSize(), numMessages, queueD.getQueueSize());
LOG.info("Queue: " + queueT.getName() + " now has: " + queueT.getQueueSize() + " message(s)");
assertEquals("Expected messages in a queue: " + queueT.getQueueSize(), numMessages, queueT.getQueueSize());
queueT.purge();
queueD.copyMatchingMessagesTo(null, getDestinationString());
LOG.info("Queue: " + queueD.getName() + " now has: " + queueD.getQueueSize() + " message(s)");
assertEquals("Expected messages in a queue: " + queueD.getQueueSize(), numMessages, queueD.getQueueSize());
LOG.info("Queue: " + queueT.getName() + " now has: " + queueT.getQueueSize() + " message(s)");
assertEquals("Expected messages in a queue: " + queueT.getQueueSize(), numMessages, queueT.getQueueSize());
assertNotRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME );
}
public void testCreateDestinationWithSpacesAtEnds() throws Exception {
ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);