From 5619cd01a2cf3062f241a0e784ee71cc32314e2b Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Thu, 18 Feb 2010 23:49:41 +0000 Subject: [PATCH] resolve https://issues.apache.org/activemq/browse/AMQ-2610 - org.apache.activemq.broker.region.cursors.PendingMessageCursor.next() now increments the reference count before returning a message reference. this allows control over references when browsing or peeking rather than moving/removing git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@911650 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/region/PrefetchSubscription.java | 2 + .../apache/activemq/broker/region/Queue.java | 31 +++-- .../broker/region/TopicSubscription.java | 6 +- .../region/cursors/AbstractStoreCursor.java | 3 + .../cursors/FilePendingMessageCursor.java | 3 +- .../region/cursors/PendingMessageCursor.java | 2 +- .../cursors/VMPendingMessageCursor.java | 3 + .../java/org/apache/activemq/usage/Usage.java | 2 +- .../apache/activemq/util/ThreadTracker.java | 24 ++-- .../java/org/apache/activemq/TestSupport.java | 34 +++++ .../broker/TopicSubscriptionTest.java | 24 ++++ .../region/cursors/StoreBasedCursorTest.java | 14 +- .../StoreQueueCursorNoDuplicateTest.java | 1 + .../usecases/ExpiredMessagesTest.java | 36 ++---- .../ExpiredMessagesWithNoConsumerTest.java | 10 +- .../usecases/UnlimitedEnqueueTest.java | 122 ++++++++++++++++++ 16 files changed, 260 insertions(+), 57 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/usecases/UnlimitedEnqueueTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index cb5c99b5ee..6e2cbeb355 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -164,6 +164,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { pending.reset(); while (pending.hasNext()) { MessageReference node = pending.next(); + node.decrementReferenceCount(); if (node.getMessageId().equals(mdn.getMessageId())) { // Synchronize between dispatched list and removal of messages from pending list // related to remove subscription action @@ -575,6 +576,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { // related to remove subscription action synchronized(dispatchLock) { pending.remove(); + node.decrementReferenceCount(); if( !isDropped(node) && canDispatch(node)) { // Message may have been sitting in the pending diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 752540e886..b1b2f3206d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -808,32 +808,34 @@ public class Queue extends BaseDestination implements Task, UsageListener { } public Message[] browse() { - List l = new ArrayList(); - doBrowse(l, getMaxBrowsePageSize()); - return l.toArray(new Message[l.size()]); + List browseList = new ArrayList(); + doBrowse(browseList, getMaxBrowsePageSize()); + return browseList.toArray(new Message[browseList.size()]); } - public void doBrowse(List l, int max) { + public void doBrowse(List browseList, int max) { final ConnectionContext connectionContext = createConnectionContext(); try { pageInMessages(false); List toExpire = new ArrayList(); synchronized (dispatchMutex) { synchronized (pagedInPendingDispatch) { - addAll(pagedInPendingDispatch, l, max, toExpire); + addAll(pagedInPendingDispatch, browseList, max, toExpire); for (MessageReference ref : toExpire) { pagedInPendingDispatch.remove(ref); if (broker.isExpired(ref)) { + LOG.debug("expiring from pagedInPending: " + ref); messageExpired(connectionContext, ref); } } } toExpire.clear(); synchronized (pagedInMessages) { - addAll(pagedInMessages.values(), l, max, toExpire); + addAll(pagedInMessages.values(), browseList, max, toExpire); } for (MessageReference ref : toExpire) { if (broker.isExpired(ref)) { + LOG.debug("expiring from pagedInMessages: " + ref); messageExpired(connectionContext, ref); } else { synchronized (pagedInMessages) { @@ -842,23 +844,25 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } - if (l.size() < getMaxBrowsePageSize()) { + if (browseList.size() < getMaxBrowsePageSize()) { synchronized (messages) { try { messages.reset(); - while (messages.hasNext() && l.size() < max) { + while (messages.hasNext() && browseList.size() < max) { MessageReference node = messages.next(); if (node.isExpired()) { if (broker.isExpired(node)) { + LOG.debug("expiring from messages: " + node); messageExpired(connectionContext, createMessageReference(node.getMessage())); } messages.remove(); } else { messages.rollback(node.getMessageId()); - if (l.contains(node.getMessage()) == false) { - l.add(node.getMessage()); + if (browseList.contains(node.getMessage()) == false) { + browseList.add(node.getMessage()); } } + node.decrementReferenceCount(); } } finally { messages.release(); @@ -897,6 +901,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { while (messages.hasNext()) { try { MessageReference r = messages.next(); + r.decrementReferenceCount(); messages.rollback(r.getMessageId()); if (msgId.equals(r.getMessageId())) { Message m = r.getMessage(); @@ -1444,12 +1449,13 @@ public class Queue extends BaseDestination implements Task, UsageListener { messages.reset(); while (messages.hasNext() && count < toPageIn) { MessageReference node = messages.next(); - node.incrementReferenceCount(); messages.remove(); QueueMessageReference ref = createMessageReference(node.getMessage()); if (ref.isExpired()) { if (broker.isExpired(ref)) { messageExpired(createConnectionContext(), ref); + } else { + ref.decrementReferenceCount(); } } else { result.add(ref); @@ -1467,6 +1473,8 @@ public class Queue extends BaseDestination implements Task, UsageListener { if (!pagedInMessages.containsKey(ref.getMessageId())) { pagedInMessages.put(ref.getMessageId(), ref); resultList.add(ref); + } else { + ref.decrementReferenceCount(); } } } @@ -1657,7 +1665,6 @@ public class Queue extends BaseDestination implements Task, UsageListener { messages.reset(); while (messages.hasNext()) { MessageReference node = messages.next(); - node.incrementReferenceCount(); messages.remove(); if (messageId.equals(node.getMessageId())) { message = this.createMessageReference(node.getMessage()); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 561175b015..a789ddca43 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -161,6 +161,7 @@ public class TopicSubscription extends AbstractSubscription { matched.reset(); while (matched.hasNext()) { MessageReference node = matched.next(); + node.decrementReferenceCount(); if (broker.isExpired(node)) { matched.remove(); dispatchedCounter.incrementAndGet(); @@ -181,6 +182,7 @@ public class TopicSubscription extends AbstractSubscription { matched.reset(); while (matched.hasNext()) { MessageReference node = matched.next(); + node.decrementReferenceCount(); if (node.getMessageId().equals(mdn.getMessageId())) { matched.remove(); dispatchedCounter.incrementAndGet(); @@ -384,8 +386,8 @@ public class TopicSubscription extends AbstractSubscription { matched.reset(); while (matched.hasNext() && !isFull()) { - MessageReference message = (MessageReference) matched - .next(); + MessageReference message = (MessageReference) matched.next(); + message.decrementReferenceCount(); matched.remove(); // Message may have been sitting in the matched list a // while diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index b1e003ff01..7f9bff7302 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -151,6 +151,9 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i result = this.iterator.next().getValue(); } last = result; + if (result != null) { + result.incrementReferenceCount(); + } return result; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java index 1b885161d4..73a710d854 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java @@ -162,6 +162,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple int count = 0; for (Iterator i = memoryList.iterator(); i.hasNext() && count < maxItems;) { MessageReference ref = i.next(); + ref.incrementReferenceCount(); result.add(ref); count++; } @@ -282,8 +283,8 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple // got from disk message.setRegionDestination(regionDestination); message.setMemoryUsage(this.getSystemUsage().getMemoryUsage()); - message.incrementReferenceCount(); } + message.incrementReferenceCount(); return message; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java index 22c12ed05c..55b4b19446 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java @@ -108,7 +108,7 @@ public interface PendingMessageCursor extends Service { boolean hasNext(); /** - * @return the next pending message + * @return the next pending message with its reference count increment */ MessageReference next(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java index c54b6b90f3..7103e045a9 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java @@ -117,6 +117,9 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor { */ public synchronized MessageReference next() { last = (MessageReference)iter.next(); + if (last != null) { + last.incrementReferenceCount(); + } return last; } diff --git a/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java b/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java index 3232e3f069..843325923b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java +++ b/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java @@ -407,7 +407,7 @@ public abstract class Usage implements Service { } static { - executor = new ThreadPoolExecutor(10, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { + executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { public Thread newThread(Runnable runnable) { Thread thread = new Thread(runnable, "Usage Async Task"); thread.setDaemon(true); diff --git a/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java b/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java index c5907e9d9a..2149e96714 100644 --- a/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java +++ b/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java @@ -26,13 +26,14 @@ public class ThreadTracker { * track the stack trace of callers * @param name the method being tracked */ - public static void track(String name) { + public static void track(final String name) { Tracker t; + final String key = name.intern(); synchronized(trackers) { - t = trackers.get(name); + t = trackers.get(key); if (t == null) { t = new Tracker(); - trackers.put(name, t); + trackers.put(key, t); } } t.track(); @@ -56,23 +57,30 @@ public class ThreadTracker { @SuppressWarnings("serial") class Trace extends Throwable { public int count = 1; - public final int size; + public final long id; Trace() { super(); - size = this.getStackTrace().length; + id = calculateIdentifier(); + } + private long calculateIdentifier() { + int len = 0; + for (int i=0; i { +class Tracker extends HashMap { public void track() { Trace current = new Trace(); synchronized(this) { - Trace exist = get(current.size); + Trace exist = get(current.id); if (exist != null) { exist.count++; } else { - put(current.size, current); + put(current.id, current); } } } diff --git a/activemq-core/src/test/java/org/apache/activemq/TestSupport.java b/activemq-core/src/test/java/org/apache/activemq/TestSupport.java index d59bd21564..873e03fd83 100755 --- a/activemq-core/src/test/java/org/apache/activemq/TestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/TestSupport.java @@ -17,6 +17,7 @@ package org.apache.activemq; import java.io.File; +import java.util.Map; import javax.jms.Connection; import javax.jms.Destination; @@ -25,6 +26,11 @@ import javax.jms.Message; import javax.jms.TextMessage; import junit.framework.TestCase; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.DestinationStatistics; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -139,4 +145,32 @@ public class TestSupport extends TestCase { recursiveDelete(new File(System.getProperty("derby.system.home"))); } } + + public static DestinationStatistics getDestinationStatistics(BrokerService broker, ActiveMQDestination destination) { + DestinationStatistics result = null; + org.apache.activemq.broker.region.Destination dest = getDestination(broker, destination); + if (dest != null) { + result = dest.getDestinationStatistics(); + } + return result; + } + + public static org.apache.activemq.broker.region.Destination getDestination(BrokerService target, ActiveMQDestination destination) { + org.apache.activemq.broker.region.Destination result = null; + for (org.apache.activemq.broker.region.Destination dest : getDestinationMap(target, destination).values()) { + if (dest.getName().equals(destination.getPhysicalName())) { + result = dest; + break; + } + } + return result; + } + + private static Map getDestinationMap(BrokerService target, + ActiveMQDestination destination) { + RegionBroker regionBroker = (RegionBroker) target.getRegionBroker(); + return destination.isQueue() ? + regionBroker.getQueueRegion().getDestinationMap() : + regionBroker.getTopicRegion().getDestinationMap(); + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java index 427316105a..bdb5767dc4 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java @@ -16,6 +16,12 @@ */ package org.apache.activemq.broker; +import javax.jms.JMSException; + +import org.apache.activemq.TestSupport; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.util.ThreadTracker; + public class TopicSubscriptionTest extends QueueSubscriptionTest { protected void setUp() throws Exception { @@ -23,6 +29,11 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest { durable = true; topic = true; } + + protected void tearDown() throws Exception { + super.tearDown(); + ThreadTracker.result(); + } public void testManyProducersManyConsumers() throws Exception { consumerCount = 40; @@ -34,6 +45,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest { doMultipleClientsTest(); assertTotalMessagesReceived(messageCount * producerCount * consumerCount); + assertDestinationMemoryUsageGoesToZero(); } public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception { @@ -46,6 +58,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest { doMultipleClientsTest(); assertTotalMessagesReceived(messageCount * consumerCount * producerCount); + assertDestinationMemoryUsageGoesToZero(); } public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception { @@ -58,6 +71,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest { doMultipleClientsTest(); assertTotalMessagesReceived(messageCount * consumerCount * producerCount); + assertDestinationMemoryUsageGoesToZero(); } public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception { @@ -82,6 +96,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest { doMultipleClientsTest(); assertTotalMessagesReceived(messageCount * consumerCount * producerCount); + assertDestinationMemoryUsageGoesToZero(); } public void testOneProducerManyConsumersFewMessages() throws Exception { @@ -94,6 +109,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest { doMultipleClientsTest(); assertTotalMessagesReceived(messageCount * consumerCount * producerCount); + assertDestinationMemoryUsageGoesToZero(); } public void testOneProducerManyConsumersManyMessages() throws Exception { @@ -106,6 +122,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest { doMultipleClientsTest(); assertTotalMessagesReceived(messageCount * consumerCount * producerCount); + assertDestinationMemoryUsageGoesToZero(); } @@ -119,5 +136,12 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest { doMultipleClientsTest(); assertTotalMessagesReceived(messageCount * producerCount * consumerCount); + assertDestinationMemoryUsageGoesToZero(); } + + private void assertDestinationMemoryUsageGoesToZero() throws Exception { + assertEquals("destination memory is back to 0", 0, + TestSupport.getDestination(broker, ActiveMQDestination.transform(destination)).getMemoryUsage().getPercentUsage()); + } + } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreBasedCursorTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreBasedCursorTest.java index ec0a5cf6fd..a330723060 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreBasedCursorTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreBasedCursorTest.java @@ -24,35 +24,38 @@ package org.apache.activemq.broker.region.cursors; import java.util.Date; import javax.jms.Connection; -import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; +import junit.framework.TestCase; + import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.usage.SystemUsage; -import junit.framework.TestCase; - public class StoreBasedCursorTest extends TestCase { protected String bindAddress = "tcp://localhost:60706"; BrokerService broker; - ConnectionFactory factory; + ActiveMQConnectionFactory factory; Connection connection; Session session; Queue queue; int messageSize = 1024; - int memoryLimit = 5 * messageSize; + // actual message is messageSize*2, and 4*MessageSize would allow 2 messages be delivered, but the flush of the cache is async so the flush + // triggered on 2nd message maxing out the usage may not be in effect for the 3rd message to succeed. Making the memory usage more lenient + // gives the usageChange listener in the cursor an opportunity to kick in. + int memoryLimit = 12 * messageSize; protected void setUp() throws Exception { super.setUp(); if (broker == null) { broker = new BrokerService(); + broker.setAdvisorySupport(false); } } @@ -67,6 +70,7 @@ public class StoreBasedCursorTest extends TestCase { protected void start() throws Exception { broker.start(); factory = new ActiveMQConnectionFactory("vm://localhost?jms.alwaysSyncSend=true"); + factory.setWatchTopicAdvisories(false); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java index cd8685eadc..24cbace0db 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java @@ -99,6 +99,7 @@ public class StoreQueueCursorNoDuplicateTest extends TestCase { underTest.reset(); while (underTest.hasNext() && dequeueCount < count) { MessageReference ref = underTest.next(); + ref.decrementReferenceCount(); underTest.remove(); assertEquals(dequeueCount++, ref.getMessageId() .getProducerSequenceId()); diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java index e3ae3f78c9..287ba08446 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java @@ -43,6 +43,9 @@ import org.apache.activemq.store.amq.AMQPersistenceAdapter; import org.apache.activemq.util.Wait; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import static org.apache.activemq.TestSupport.getDestination; +import static org.apache.activemq.TestSupport.getDestinationStatistics; + public class ExpiredMessagesTest extends CombinationTestSupport { @@ -124,7 +127,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport { producingThread.join(); session.close(); - final DestinationStatistics view = this.getDestinationStatistics(destination); + final DestinationStatistics view = getDestinationStatistics(broker, destination); // wait for all to inflight to expire assertTrue("all inflight messages expired ", Wait.waitFor(new Wait.Condition() { @@ -165,7 +168,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport { final long expiredBeforeEnqueue = numMessagesToSend - view.getEnqueues().getCount(); final long totalExpiredCount = view.getExpired().getCount() + expiredBeforeEnqueue; - final DestinationStatistics dlqView = getDestinationStatistics(dlqDestination); + final DestinationStatistics dlqView = getDestinationStatistics(broker, dlqDestination); LOG.info("DLQ stats: size= " + dlqView.getMessages().getCount() + ", enqueues: " + dlqView.getDequeues().getCount() + ", dequeues: " + dlqView.getDequeues().getCount() + ", dispatched: " + dlqView.getDispatched().getCount() + ", inflight: " + dlqView.getInflight().getCount() + ", expiries: " + dlqView.getExpired().getCount()); @@ -177,8 +180,8 @@ public class ExpiredMessagesTest extends CombinationTestSupport { assertEquals("dlq contains all expired", totalExpiredCount, dlqView.getMessages().getCount()); // memory check - assertEquals("memory usage is back to duck egg", 0, this.getDestination(destination).getMemoryUsage().getPercentUsage()); - assertTrue("memory usage is increased ", 0 < this.getDestination(dlqDestination).getMemoryUsage().getPercentUsage()); + assertEquals("memory usage is back to duck egg", 0, getDestination(broker, destination).getMemoryUsage().getPercentUsage()); + assertTrue("memory usage is increased ", 0 < getDestination(broker, dlqDestination).getMemoryUsage().getPercentUsage()); // verify DLQ MessageConsumer dlqConsumer = createDlqConsumer(connection); @@ -243,7 +246,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport { producingThread.start(); producingThread.join(); - DestinationStatistics view = getDestinationStatistics(destination); + DestinationStatistics view = getDestinationStatistics(broker, destination); LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount() + ", dispatched: " @@ -263,7 +266,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport { Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { - DestinationStatistics view = getDestinationStatistics(destination); + DestinationStatistics view = getDestinationStatistics(broker, destination); LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount() + ", dispatched: " @@ -275,7 +278,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport { } }); - view = getDestinationStatistics(destination); + view = getDestinationStatistics(broker, destination); assertEquals("Expect empty queue, QueueSize: ", 0, view.getMessages().getCount()); assertEquals("all dequeues were expired", view.getDequeues().getCount(), view.getExpired().getCount()); } @@ -305,26 +308,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport { return broker; } - private DestinationStatistics getDestinationStatistics(ActiveMQDestination destination) { - DestinationStatistics result = null; - org.apache.activemq.broker.region.Destination dest = getDestination(destination); - if (dest != null) { - result = dest.getDestinationStatistics(); - } - return result; - } - private org.apache.activemq.broker.region.Destination getDestination(ActiveMQDestination destination) { - org.apache.activemq.broker.region.Destination result = null; - RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker(); - for (org.apache.activemq.broker.region.Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) { - if (dest.getName().equals(destination.getPhysicalName())) { - result = dest; - break; - } - } - return result; - } protected void tearDown() throws Exception { connection.stop(); diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java index b8371854af..f433caf822 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java @@ -33,8 +33,11 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.DestinationViewMBean; +import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy; +import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.util.Wait; @@ -53,6 +56,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { MessageProducer producer; public ActiveMQDestination destination = new ActiveMQQueue("test"); public boolean optimizedDispatch = true; + public PendingQueueMessageStoragePolicy pendingQueuePolicy; public static Test suite() { return suite(ExpiredMessagesWithNoConsumerTest.class); @@ -82,6 +86,8 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { defaultEntry.setOptimizedDispatch(optimizedDispatch ); defaultEntry.setExpireMessagesPeriod(800); defaultEntry.setMaxExpirePageSize(800); + + defaultEntry.setPendingQueuePolicy(pendingQueuePolicy); if (memoryLimit) { // so memory is not consumed by DLQ turn if off @@ -99,6 +105,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { public void initCombosForTestExpiredMessagesWithNoConsumer() { addCombinationValues("optimizedDispatch", new Object[] {Boolean.TRUE, Boolean.FALSE}); + addCombinationValues("pendingQueuePolicy", new Object[] {null, new VMPendingQueueMessageStoragePolicy(), new FilePendingQueueMessageStoragePolicy()}); } public void testExpiredMessagesWithNoConsumer() throws Exception { @@ -111,7 +118,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { producer = session.createProducer(destination); producer.setTimeToLive(1000); connection.start(); - final long sendCount = 2000; + final long sendCount = 2000; final Thread producingThread = new Thread("Producing Thread") { public void run() { @@ -154,6 +161,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { + ", size= " + view.getQueueSize()); assertEquals("All sent have expired", sendCount, view.getExpiredCount()); + assertEquals("memory usage goes to duck egg", 0, view.getMemoryPercentUsage()); } // first ack delivered after expiry diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/UnlimitedEnqueueTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/UnlimitedEnqueueTest.java new file mode 100644 index 0000000000..3366174723 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/UnlimitedEnqueueTest.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.usecases; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.ResourceAllocationException; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.junit.Before; +import org.junit.Test; + +public class UnlimitedEnqueueTest { + + BrokerService brokerService = null; + final long numMessages = 50000; + final long numThreads = 10; + + @Test + public void testEnqueueIsOnlyLimitedByDisk() throws Exception { + ExecutorService executor = Executors.newCachedThreadPool(); + for (int i=0; i entries = new ArrayList(); + PolicyEntry policy = new PolicyEntry(); + + // NB: ensure queue cursor limit is below the default 70% usage that the destination will use + // if they are the same, the queue memory limit and flow control will kick in first + policy.setCursorMemoryHighWaterMark(20); + + // on by default + //policy.setProducerFlowControl(true); + policy.setQueue(">"); + + // policy that will spool references to disk + policy.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy()); + entries.add(policy); + policyMap.setPolicyEntries(entries); + brokerService.setDestinationPolicy(policyMap); + + brokerService.start(); + } + + public class Producer implements Runnable{ + + private final long numberOfMessages; + + public Producer(final long n){ + this.numberOfMessages = n; + } + + public void run(){ + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerService.getVmConnectorURI()); + try { + Connection conn = factory.createConnection(); + conn.start(); + for (int i = 0; i < numberOfMessages; i++) { + Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue("test-queue"); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + BytesMessage message = session.createBytesMessage(); + byte[] bytes = new byte[1024*10]; + message.writeBytes(bytes); + try { + producer.send(message); + } catch (ResourceAllocationException e) { + e.printStackTrace(); + } + session.close(); + } + } catch (JMSException e) { + throw new RuntimeException(e); + } + } + } +}