From 56bb079c8227a2beee609b205c001d66597db98a Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon (cshannon)" Date: Tue, 24 Jan 2017 08:46:15 -0500 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-1940 Queue purge now acquires the sendLock to prevent new messages from coming in while purging. The statistics are no longer zeroed out as they should properly decrement as messages are removed. These changes should prevent the statistics from going negative. --- .../apache/activemq/broker/region/Queue.java | 48 +++++++++--------- .../broker/region/QueuePurgeTest.java | 50 +++++++++++++++++++ 2 files changed, 75 insertions(+), 23 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index b841b89b11..6283232fb4 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1237,33 +1237,35 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index public void purge() throws Exception { ConnectionContext c = createConnectionContext(); List list = null; - long originalMessageCount = this.destinationStatistics.getMessages().getCount(); - do { - doPageIn(true, false, getMaxPageSize()); // signal no expiry processing needed. - pagedInMessagesLock.readLock().lock(); - try { - list = new ArrayList(pagedInMessages.values()); - }finally { - pagedInMessagesLock.readLock().unlock(); - } - - for (MessageReference ref : list) { + try { + sendLock.lock(); + long originalMessageCount = this.destinationStatistics.getMessages().getCount(); + do { + doPageIn(true, false, getMaxPageSize()); // signal no expiry processing needed. + pagedInMessagesLock.readLock().lock(); try { - QueueMessageReference r = (QueueMessageReference) ref; - removeMessage(c, r); - } catch (IOException e) { + list = new ArrayList(pagedInMessages.values()); + }finally { + pagedInMessagesLock.readLock().unlock(); } - } - // don't spin/hang if stats are out and there is nothing left in the - // store - } while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0); - if (this.destinationStatistics.getMessages().getCount() > 0) { - LOG.warn("{} after purge of {} messages, message count stats report: {}", getActiveMQDestination().getQualifiedName(), originalMessageCount, this.destinationStatistics.getMessages().getCount()); + for (MessageReference ref : list) { + try { + QueueMessageReference r = (QueueMessageReference) ref; + removeMessage(c, r); + } catch (IOException e) { + } + } + // don't spin/hang if stats are out and there is nothing left in the + // store + } while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0); + + if (this.destinationStatistics.getMessages().getCount() > 0) { + LOG.warn("{} after purge of {} messages, message count stats report: {}", getActiveMQDestination().getQualifiedName(), originalMessageCount, this.destinationStatistics.getMessages().getCount()); + } + } finally { + sendLock.unlock(); } - gc(); - this.destinationStatistics.getMessages().setCount(0); - getMessages().clear(); } @Override diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java index 85faeab7f9..30b2cb23c5 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java @@ -17,6 +17,8 @@ package org.apache.activemq.broker.region; import java.io.File; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.Connection; @@ -178,6 +180,54 @@ public class QueuePurgeTest extends CombinationTestSupport { testPurgeLargeQueueWithConsumer(true); } + public void testConcurrentPurgeAndSend() throws Exception { + testConcurrentPurgeAndSend(false); + } + + public void testConcurrentPurgeAndSendPrioritizedMessages() throws Exception { + testConcurrentPurgeAndSend(true); + } + + private void testConcurrentPurgeAndSend(boolean prioritizedMessages) throws Exception { + applyBrokerSpoolingPolicy(false); + createProducerAndSendMessages(NUM_TO_SEND / 2); + QueueViewMBean proxy = getProxyToQueueViewMBean(); + createConsumer(); + long start = System.currentTimeMillis(); + ExecutorService service = Executors.newFixedThreadPool(1); + try { + LOG.info("purging.."); + service.submit(new Runnable() { + + @Override + public void run() { + try { + proxy.purge(); + } catch (Exception e) { + fail(e.getMessage()); + } + LOG.info("purge done: " + (System.currentTimeMillis() - start) + "ms"); + } + }); + + //send should get blocked while purge is running + //which should ensure the metrics are correct + createProducerAndSendMessages(NUM_TO_SEND / 2); + + Message msg; + do { + msg = consumer.receive(1000); + if (msg != null) { + msg.acknowledge(); + } + } while (msg != null); + assertEquals("Queue size not valid", 0, proxy.getQueueSize()); + assertEquals("Found messages when browsing", 0, proxy.browseMessages().size()); + } finally { + service.shutdownNow(); + } + } + private void testPurgeLargeQueueWithConsumer(boolean prioritizedMessages) throws Exception { applyBrokerSpoolingPolicy(prioritizedMessages); createProducerAndSendMessages(NUM_TO_SEND);