From c1ebbc1465998706b254bcf004548845c4a1ad1e Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Fri, 20 May 2011 15:16:51 +0000 Subject: [PATCH] Fix for: https://issues.apache.org/jira/browse/AMQ-3312 Replace usage of CopyOnWriteArraySet in the MBean methods that don't require a thread safe collection. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1125427 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/region/Queue.java | 49 +++-- .../usecases/LargeQueueSparseDeleteTest.java | 200 ++++++++++++++++++ 2 files changed, 224 insertions(+), 25 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/usecases/LargeQueueSparseDeleteTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 0f86d53d6f..f7e75df75c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -30,7 +30,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CancellationException; -import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; @@ -78,8 +77,8 @@ import org.slf4j.MDC; /** * The Queue is a List of MessageEntry objects that are dispatched to matching * subscriptions. - * - * + * + * */ public class Queue extends BaseDestination implements Task, UsageListener { protected static final Logger LOG = LoggerFactory.getLogger(Queue.class); @@ -113,7 +112,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { private CountDownLatch consumersBeforeStartsLatch; private final AtomicLong pendingWakeups = new AtomicLong(); private boolean allConsumersExclusiveByDefault = false; - + private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { public void run() { asyncWakeup(); @@ -369,7 +368,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { try { sub.add(context, this); - + // needs to be synchronized - so no contention with dispatching // consumersLock. consumersLock.writeLock().lock(); @@ -461,7 +460,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { } else if (isAllConsumersExclusiveByDefault()) { Subscription exclusiveConsumer = null; for (Subscription s : consumers) { - if (exclusiveConsumer == null + if (exclusiveConsumer == null || s.getConsumerInfo().getPriority() > exclusiveConsumer .getConsumerInfo().getPriority()) { exclusiveConsumer = s; @@ -670,11 +669,11 @@ public class Queue extends BaseDestination implements Task, UsageListener { Exception { final ConnectionContext context = producerExchange.getConnectionContext(); Future result = null; - + checkUsage(context, message); sendLock.lockInterruptibly(); try { - if (store != null && message.isPersistent()) { + if (store != null && message.isPersistent()) { message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); if (messages.isCacheEnabled()) { result = store.asyncAddQueueMessage(context, message); @@ -690,7 +689,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { // a big TX does not blow up // our memory. This increment is decremented once the tx finishes.. message.incrementReferenceCount(); - + context.getTransaction().addSynchronization(new Synchronization() { @Override public void afterCommit() throws Exception { @@ -749,11 +748,11 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } else if (messages.getSystemUsage() != null && systemUsage.getTempUsage().isFull()) { final String logMessage = "Usage Manager Temp Store is Full (" - + systemUsage.getTempUsage().getPercentUsage() + "% of " + systemUsage.getTempUsage().getLimit() + + systemUsage.getTempUsage().getPercentUsage() + "% of " + systemUsage.getTempUsage().getLimit() +"). Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"; - + waitForSpace(context, messages.getSystemUsage().getTempUsage(), logMessage); } } @@ -1154,7 +1153,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { /** * Removes the messages matching the given selector - * + * * @return the number of messages removed */ public int removeMatchingMessages(String selector) throws Exception { @@ -1164,7 +1163,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { /** * Removes the messages matching the given selector up to the maximum number * of matched messages - * + * * @return the number of messages removed */ public int removeMatchingMessages(String selector, int maximumMessages) throws Exception { @@ -1174,12 +1173,12 @@ public class Queue extends BaseDestination implements Task, UsageListener { /** * Removes the messages matching the given filter up to the maximum number * of matched messages - * + * * @return the number of messages removed */ public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception { int movedCounter = 0; - Set set = new CopyOnWriteArraySet(); + Set set = new HashSet(); ConnectionContext context = createConnectionContext(); do { doPageIn(true); @@ -1215,7 +1214,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { /** * Copies the messages matching the given selector - * + * * @return the number of messages copied */ public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) @@ -1226,7 +1225,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { /** * Copies the messages matching the given selector up to the maximum number * of matched messages - * + * * @return the number of messages copied */ public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, @@ -1237,14 +1236,14 @@ public class Queue extends BaseDestination implements Task, UsageListener { /** * Copies the messages matching the given filter up to the maximum number of * matched messages - * + * * @return the number of messages copied */ public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception { int movedCounter = 0; int count = 0; - Set set = new CopyOnWriteArraySet(); + Set set = new HashSet(); do { int oldMaxSize = getMaxPageSize(); setMaxPageSize((int) this.destinationStatistics.getMessages().getCount()); @@ -1311,7 +1310,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { /** * Moves the messages matching the given selector - * + * * @return the number of messages removed */ public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) @@ -1335,7 +1334,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception { int movedCounter = 0; - Set set = new CopyOnWriteArraySet(); + Set set = new HashSet(); do { doPageIn(true); pagedInMessagesLock.readLock().lock(); @@ -1632,8 +1631,8 @@ public class Queue extends BaseDestination implements Task, UsageListener { messagesLock.writeLock().unlock(); } } - - final void messageSent(final ConnectionContext context, final Message msg) throws Exception { + + final void messageSent(final ConnectionContext context, final Message msg) throws Exception { destinationStatistics.getEnqueues().increment(); destinationStatistics.getMessages().increment(); messageDelivered(context, msg); @@ -1751,7 +1750,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { try { resultList = new ArrayList(result.size()); for (QueueMessageReference ref : result) { - if (!pagedInMessages.containsKey(ref.getMessageId())) { + if (!pagedInMessages.containsKey(ref.getMessageId())) { pagedInMessages.put(ref.getMessageId(), ref); resultList.add(ref); } else { @@ -2065,7 +2064,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { /** * Find a consumer that matches the id in the message dispatch notification - * + * * @param messageDispatchNotification * @return sub or null if the subscription has been removed before dispatch * @throws JMSException diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/LargeQueueSparseDeleteTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/LargeQueueSparseDeleteTest.java new file mode 100644 index 0000000000..8b6e698e07 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/LargeQueueSparseDeleteTest.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.usecases; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.filter.NonCachedMessageEvaluationContext; +import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.broker.ConnectionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.junit.Assert; + +/** + * This unit test creates a fixed size queue and moves the last message in the + * queue to another queue. The test is used to very the performance of + * {@link org.apache.activemq.broker.region.Queue#moveMatchingMessagesTo(org.apache.activemq.broker.ConnectionContext, String, org.apache.activemq.command.ActiveMQDestination)}. + */ +public class LargeQueueSparseDeleteTest extends EmbeddedBrokerTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(LargeQueueSparseDeleteTest.class); + + /** + * {@inheritDoc} + */ + @Override + protected void setUp() throws Exception { + super.useTopic = false; + super.setUp(); + } + + /** + * The test queue is filled with QUEUE_SIZE test messages, each with a + * numeric id property beginning at 0. Once the queue is filled, the last + * message (id = QUEUE_SIZE-1) is moved to another queue. The test succeeds + * if the move completes within TEST_TIMEOUT milliseconds. + * + * @throws Exception + */ + public void testMoveMessages() throws Exception { + final int QUEUE_SIZE = 30000; + final String MOVE_TO_DESTINATION_NAME = getDestinationString() + + ".dest"; + final long TEST_TIMEOUT = 6000; + + // Populate a test queue with uniquely-identifiable messages. + Connection conn = createConnection(); + try { + conn.start(); + Session session = conn.createSession(true, + Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(destination); + for (int i = 0; i < QUEUE_SIZE; i++) { + Message message = session.createMessage(); + message.setIntProperty("id", i); + producer.send(message); + } + session.commit(); + } finally { + conn.close(); + } + + // Access the implementation of the test queue and move the last message + // to another queue. Verify that the move occurred within the limits of + // the test. + Queue queue = (Queue) broker.getRegionBroker().getDestinationMap().get( + destination); + + ConnectionContext context = new ConnectionContext( + new NonCachedMessageEvaluationContext()); + context.setBroker(broker.getBroker()); + context.getMessageEvaluationContext().setDestination(destination); + + long startTimeMillis = System.currentTimeMillis(); + Assert.assertEquals(1, queue + .moveMatchingMessagesTo(context, "id=" + (QUEUE_SIZE - 1), + createDestination(MOVE_TO_DESTINATION_NAME))); + + long durationMillis = System.currentTimeMillis() - startTimeMillis; + + LOG.info("It took " + durationMillis + + "ms to move the last message from a queue a " + QUEUE_SIZE + + " messages."); + + Assert.assertTrue("Moving the message took too long: " + durationMillis + + "ms", durationMillis < TEST_TIMEOUT); + } + + public void testCopyMessages() throws Exception { + final int QUEUE_SIZE = 30000; + final String MOVE_TO_DESTINATION_NAME = getDestinationString() + + ".dest"; + final long TEST_TIMEOUT = 6000; + + // Populate a test queue with uniquely-identifiable messages. + Connection conn = createConnection(); + try { + conn.start(); + Session session = conn.createSession(true, + Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(destination); + for (int i = 0; i < QUEUE_SIZE; i++) { + Message message = session.createMessage(); + message.setIntProperty("id", i); + producer.send(message); + } + session.commit(); + } finally { + conn.close(); + } + + // Access the implementation of the test queue and move the last message + // to another queue. Verify that the move occurred within the limits of + // the test. + Queue queue = (Queue) broker.getRegionBroker().getDestinationMap().get( + destination); + + ConnectionContext context = new ConnectionContext( + new NonCachedMessageEvaluationContext()); + context.setBroker(broker.getBroker()); + context.getMessageEvaluationContext().setDestination(destination); + + long startTimeMillis = System.currentTimeMillis(); + Assert.assertEquals(1, + queue.copyMatchingMessagesTo(context, "id=" + (QUEUE_SIZE - 1), createDestination(MOVE_TO_DESTINATION_NAME))); + + long durationMillis = System.currentTimeMillis() - startTimeMillis; + + LOG.info("It took " + durationMillis + + "ms to copy the last message from a queue a " + QUEUE_SIZE + + " messages."); + + Assert.assertTrue("Copying the message took too long: " + durationMillis + + "ms", durationMillis < TEST_TIMEOUT); + } + + public void testRemoveMessages() throws Exception { + final int QUEUE_SIZE = 30000; + final long TEST_TIMEOUT = 6000; + + // Populate a test queue with uniquely-identifiable messages. + Connection conn = createConnection(); + try { + conn.start(); + Session session = conn.createSession(true, + Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(destination); + for (int i = 0; i < QUEUE_SIZE; i++) { + Message message = session.createMessage(); + message.setIntProperty("id", i); + producer.send(message); + } + session.commit(); + } finally { + conn.close(); + } + + // Access the implementation of the test queue and move the last message + // to another queue. Verify that the move occurred within the limits of + // the test. + Queue queue = (Queue) broker.getRegionBroker().getDestinationMap().get( + destination); + + ConnectionContext context = new ConnectionContext( + new NonCachedMessageEvaluationContext()); + context.setBroker(broker.getBroker()); + context.getMessageEvaluationContext().setDestination(destination); + + long startTimeMillis = System.currentTimeMillis(); + Assert.assertEquals(1, + queue.removeMatchingMessages("id=" + (QUEUE_SIZE - 1))); + + long durationMillis = System.currentTimeMillis() - startTimeMillis; + + LOG.info("It took " + durationMillis + + "ms to remove the last message from a queue a " + QUEUE_SIZE + + " messages."); + + Assert.assertTrue("Removing the message took too long: " + durationMillis + + "ms", durationMillis < TEST_TIMEOUT); + } +}