diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index b841b89b11..6283232fb4 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1237,33 +1237,35 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index public void purge() throws Exception { ConnectionContext c = createConnectionContext(); List list = null; - long originalMessageCount = this.destinationStatistics.getMessages().getCount(); - do { - doPageIn(true, false, getMaxPageSize()); // signal no expiry processing needed. - pagedInMessagesLock.readLock().lock(); - try { - list = new ArrayList(pagedInMessages.values()); - }finally { - pagedInMessagesLock.readLock().unlock(); - } - - for (MessageReference ref : list) { + try { + sendLock.lock(); + long originalMessageCount = this.destinationStatistics.getMessages().getCount(); + do { + doPageIn(true, false, getMaxPageSize()); // signal no expiry processing needed. + pagedInMessagesLock.readLock().lock(); try { - QueueMessageReference r = (QueueMessageReference) ref; - removeMessage(c, r); - } catch (IOException e) { + list = new ArrayList(pagedInMessages.values()); + }finally { + pagedInMessagesLock.readLock().unlock(); } - } - // don't spin/hang if stats are out and there is nothing left in the - // store - } while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0); - if (this.destinationStatistics.getMessages().getCount() > 0) { - LOG.warn("{} after purge of {} messages, message count stats report: {}", getActiveMQDestination().getQualifiedName(), originalMessageCount, this.destinationStatistics.getMessages().getCount()); + for (MessageReference ref : list) { + try { + QueueMessageReference r = (QueueMessageReference) ref; + removeMessage(c, r); + } catch (IOException e) { + } + } + // don't spin/hang if stats are out and there is nothing left in the + // store + } while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0); + + if (this.destinationStatistics.getMessages().getCount() > 0) { + LOG.warn("{} after purge of {} messages, message count stats report: {}", getActiveMQDestination().getQualifiedName(), originalMessageCount, this.destinationStatistics.getMessages().getCount()); + } + } finally { + sendLock.unlock(); } - gc(); - this.destinationStatistics.getMessages().setCount(0); - getMessages().clear(); } @Override diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java index 85faeab7f9..30b2cb23c5 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java @@ -17,6 +17,8 @@ package org.apache.activemq.broker.region; import java.io.File; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.Connection; @@ -178,6 +180,54 @@ public class QueuePurgeTest extends CombinationTestSupport { testPurgeLargeQueueWithConsumer(true); } + public void testConcurrentPurgeAndSend() throws Exception { + testConcurrentPurgeAndSend(false); + } + + public void testConcurrentPurgeAndSendPrioritizedMessages() throws Exception { + testConcurrentPurgeAndSend(true); + } + + private void testConcurrentPurgeAndSend(boolean prioritizedMessages) throws Exception { + applyBrokerSpoolingPolicy(false); + createProducerAndSendMessages(NUM_TO_SEND / 2); + QueueViewMBean proxy = getProxyToQueueViewMBean(); + createConsumer(); + long start = System.currentTimeMillis(); + ExecutorService service = Executors.newFixedThreadPool(1); + try { + LOG.info("purging.."); + service.submit(new Runnable() { + + @Override + public void run() { + try { + proxy.purge(); + } catch (Exception e) { + fail(e.getMessage()); + } + LOG.info("purge done: " + (System.currentTimeMillis() - start) + "ms"); + } + }); + + //send should get blocked while purge is running + //which should ensure the metrics are correct + createProducerAndSendMessages(NUM_TO_SEND / 2); + + Message msg; + do { + msg = consumer.receive(1000); + if (msg != null) { + msg.acknowledge(); + } + } while (msg != null); + assertEquals("Queue size not valid", 0, proxy.getQueueSize()); + assertEquals("Found messages when browsing", 0, proxy.browseMessages().size()); + } finally { + service.shutdownNow(); + } + } + private void testPurgeLargeQueueWithConsumer(boolean prioritizedMessages) throws Exception { applyBrokerSpoolingPolicy(prioritizedMessages); createProducerAndSendMessages(NUM_TO_SEND);