mirror of https://github.com/apache/activemq.git
add info log message for a queue purge event
This commit is contained in:
parent
a0c42a61dd
commit
f19add11de
|
@ -1213,6 +1213,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
||||||
public void purge() throws Exception {
|
public void purge() throws Exception {
|
||||||
ConnectionContext c = createConnectionContext();
|
ConnectionContext c = createConnectionContext();
|
||||||
List<MessageReference> list = null;
|
List<MessageReference> list = null;
|
||||||
|
long originalMessageCount = this.destinationStatistics.getMessages().getCount();
|
||||||
do {
|
do {
|
||||||
doPageIn(true, false); // signal no expiry processing needed.
|
doPageIn(true, false); // signal no expiry processing needed.
|
||||||
pagedInMessagesLock.readLock().lock();
|
pagedInMessagesLock.readLock().lock();
|
||||||
|
@ -1234,7 +1235,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
||||||
} while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0);
|
} while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0);
|
||||||
|
|
||||||
if (this.destinationStatistics.getMessages().getCount() > 0) {
|
if (this.destinationStatistics.getMessages().getCount() > 0) {
|
||||||
LOG.warn("{} after purge complete, message count stats report: {}", getActiveMQDestination().getQualifiedName(), this.destinationStatistics.getMessages().getCount());
|
LOG.warn("{} after purge of {} messages, message count stats report: {}", getActiveMQDestination().getQualifiedName(), originalMessageCount, this.destinationStatistics.getMessages().getCount());
|
||||||
|
} else {
|
||||||
|
LOG.info("{} purged of {} messages", getActiveMQDestination().getQualifiedName(), originalMessageCount);
|
||||||
}
|
}
|
||||||
gc();
|
gc();
|
||||||
this.destinationStatistics.getMessages().setCount(0);
|
this.destinationStatistics.getMessages().setCount(0);
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
package org.apache.activemq.broker.region;
|
package org.apache.activemq.broker.region;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.ConnectionFactory;
|
import javax.jms.ConnectionFactory;
|
||||||
|
@ -38,6 +39,10 @@ import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy
|
||||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
||||||
|
import org.apache.activemq.util.DefaultTestAppender;
|
||||||
|
import org.apache.log4j.Appender;
|
||||||
|
import org.apache.log4j.Level;
|
||||||
|
import org.apache.log4j.spi.LoggingEvent;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -89,10 +94,39 @@ public class QueuePurgeTest extends CombinationTestSupport {
|
||||||
createProducerAndSendMessages(NUM_TO_SEND);
|
createProducerAndSendMessages(NUM_TO_SEND);
|
||||||
QueueViewMBean proxy = getProxyToQueueViewMBean();
|
QueueViewMBean proxy = getProxyToQueueViewMBean();
|
||||||
LOG.info("purging..");
|
LOG.info("purging..");
|
||||||
proxy.purge();
|
|
||||||
|
org.apache.log4j.Logger log4jLogger = org.apache.log4j.Logger.getLogger(org.apache.activemq.broker.region.Queue.class);
|
||||||
|
final AtomicBoolean gotPurgeLogMessage = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
Appender appender = new DefaultTestAppender() {
|
||||||
|
@Override
|
||||||
|
public void doAppend(LoggingEvent event) {
|
||||||
|
if (event.getMessage() instanceof String) {
|
||||||
|
String message = (String) event.getMessage();
|
||||||
|
if (message.contains("purged of " + NUM_TO_SEND +" messages")) {
|
||||||
|
LOG.info("Received a log message: {} ", event.getMessage());
|
||||||
|
gotPurgeLogMessage.set(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Level level = log4jLogger.getLevel();
|
||||||
|
log4jLogger.setLevel(Level.INFO);
|
||||||
|
log4jLogger.addAppender(appender);
|
||||||
|
try {
|
||||||
|
|
||||||
|
proxy.purge();
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
log4jLogger.setLevel(level);
|
||||||
|
log4jLogger.removeAppender(appender);
|
||||||
|
}
|
||||||
|
|
||||||
assertEquals("Queue size is not zero, it's " + proxy.getQueueSize(), 0,
|
assertEquals("Queue size is not zero, it's " + proxy.getQueueSize(), 0,
|
||||||
proxy.getQueueSize());
|
proxy.getQueueSize());
|
||||||
assertTrue("cache is disabled, temp store being used", !proxy.isCacheEnabled());
|
assertTrue("cache is disabled, temp store being used", !proxy.isCacheEnabled());
|
||||||
|
assertTrue("got expected info purge log message", gotPurgeLogMessage.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testRepeatedExpiryProcessingOfLargeQueue() throws Exception {
|
public void testRepeatedExpiryProcessingOfLargeQueue() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue