mirror of https://github.com/apache/activemq.git
Queue purge now acquires the sendLock to prevent new messages from coming in while purging. The statistics are no longer zeroed out as they should properly decrement as messages are removed. These changes should prevent the statistics from going negative.
This commit is contained in:
parent
0ad62f722f
commit
56bb079c82
|
@ -1237,6 +1237,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
|||
public void purge() throws Exception {
|
||||
ConnectionContext c = createConnectionContext();
|
||||
List<MessageReference> list = null;
|
||||
try {
|
||||
sendLock.lock();
|
||||
long originalMessageCount = this.destinationStatistics.getMessages().getCount();
|
||||
do {
|
||||
doPageIn(true, false, getMaxPageSize()); // signal no expiry processing needed.
|
||||
|
@ -1261,9 +1263,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
|||
if (this.destinationStatistics.getMessages().getCount() > 0) {
|
||||
LOG.warn("{} after purge of {} messages, message count stats report: {}", getActiveMQDestination().getQualifiedName(), originalMessageCount, this.destinationStatistics.getMessages().getCount());
|
||||
}
|
||||
gc();
|
||||
this.destinationStatistics.getMessages().setCount(0);
|
||||
getMessages().clear();
|
||||
} finally {
|
||||
sendLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
package org.apache.activemq.broker.region;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import javax.jms.Connection;
|
||||
|
@ -178,6 +180,54 @@ public class QueuePurgeTest extends CombinationTestSupport {
|
|||
testPurgeLargeQueueWithConsumer(true);
|
||||
}
|
||||
|
||||
public void testConcurrentPurgeAndSend() throws Exception {
|
||||
testConcurrentPurgeAndSend(false);
|
||||
}
|
||||
|
||||
public void testConcurrentPurgeAndSendPrioritizedMessages() throws Exception {
|
||||
testConcurrentPurgeAndSend(true);
|
||||
}
|
||||
|
||||
private void testConcurrentPurgeAndSend(boolean prioritizedMessages) throws Exception {
|
||||
applyBrokerSpoolingPolicy(false);
|
||||
createProducerAndSendMessages(NUM_TO_SEND / 2);
|
||||
QueueViewMBean proxy = getProxyToQueueViewMBean();
|
||||
createConsumer();
|
||||
long start = System.currentTimeMillis();
|
||||
ExecutorService service = Executors.newFixedThreadPool(1);
|
||||
try {
|
||||
LOG.info("purging..");
|
||||
service.submit(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
proxy.purge();
|
||||
} catch (Exception e) {
|
||||
fail(e.getMessage());
|
||||
}
|
||||
LOG.info("purge done: " + (System.currentTimeMillis() - start) + "ms");
|
||||
}
|
||||
});
|
||||
|
||||
//send should get blocked while purge is running
|
||||
//which should ensure the metrics are correct
|
||||
createProducerAndSendMessages(NUM_TO_SEND / 2);
|
||||
|
||||
Message msg;
|
||||
do {
|
||||
msg = consumer.receive(1000);
|
||||
if (msg != null) {
|
||||
msg.acknowledge();
|
||||
}
|
||||
} while (msg != null);
|
||||
assertEquals("Queue size not valid", 0, proxy.getQueueSize());
|
||||
assertEquals("Found messages when browsing", 0, proxy.browseMessages().size());
|
||||
} finally {
|
||||
service.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
private void testPurgeLargeQueueWithConsumer(boolean prioritizedMessages) throws Exception {
|
||||
applyBrokerSpoolingPolicy(prioritizedMessages);
|
||||
createProducerAndSendMessages(NUM_TO_SEND);
|
||||
|
|
Loading…
Reference in New Issue