resolve https://issues.apache.org/activemq/browse/AMQ-2610 - org.apache.activemq.broker.region.cursors.PendingMessageCursor.next() now increments the reference count before returning a message reference. this allows control over references when browsing or peeking rather than moving/removing

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@911650 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-02-18 23:49:41 +00:00
parent 0771df3b72
commit 5619cd01a2
16 changed files with 260 additions and 57 deletions

View File

@ -164,6 +164,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
pending.reset(); pending.reset();
while (pending.hasNext()) { while (pending.hasNext()) {
MessageReference node = pending.next(); MessageReference node = pending.next();
node.decrementReferenceCount();
if (node.getMessageId().equals(mdn.getMessageId())) { if (node.getMessageId().equals(mdn.getMessageId())) {
// Synchronize between dispatched list and removal of messages from pending list // Synchronize between dispatched list and removal of messages from pending list
// related to remove subscription action // related to remove subscription action
@ -575,6 +576,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
// related to remove subscription action // related to remove subscription action
synchronized(dispatchLock) { synchronized(dispatchLock) {
pending.remove(); pending.remove();
node.decrementReferenceCount();
if( !isDropped(node) && canDispatch(node)) { if( !isDropped(node) && canDispatch(node)) {
// Message may have been sitting in the pending // Message may have been sitting in the pending

View File

@ -808,32 +808,34 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} }
public Message[] browse() { public Message[] browse() {
List<Message> l = new ArrayList<Message>(); List<Message> browseList = new ArrayList<Message>();
doBrowse(l, getMaxBrowsePageSize()); doBrowse(browseList, getMaxBrowsePageSize());
return l.toArray(new Message[l.size()]); return browseList.toArray(new Message[browseList.size()]);
} }
public void doBrowse(List<Message> l, int max) { public void doBrowse(List<Message> browseList, int max) {
final ConnectionContext connectionContext = createConnectionContext(); final ConnectionContext connectionContext = createConnectionContext();
try { try {
pageInMessages(false); pageInMessages(false);
List<MessageReference> toExpire = new ArrayList<MessageReference>(); List<MessageReference> toExpire = new ArrayList<MessageReference>();
synchronized (dispatchMutex) { synchronized (dispatchMutex) {
synchronized (pagedInPendingDispatch) { synchronized (pagedInPendingDispatch) {
addAll(pagedInPendingDispatch, l, max, toExpire); addAll(pagedInPendingDispatch, browseList, max, toExpire);
for (MessageReference ref : toExpire) { for (MessageReference ref : toExpire) {
pagedInPendingDispatch.remove(ref); pagedInPendingDispatch.remove(ref);
if (broker.isExpired(ref)) { if (broker.isExpired(ref)) {
LOG.debug("expiring from pagedInPending: " + ref);
messageExpired(connectionContext, ref); messageExpired(connectionContext, ref);
} }
} }
} }
toExpire.clear(); toExpire.clear();
synchronized (pagedInMessages) { synchronized (pagedInMessages) {
addAll(pagedInMessages.values(), l, max, toExpire); addAll(pagedInMessages.values(), browseList, max, toExpire);
} }
for (MessageReference ref : toExpire) { for (MessageReference ref : toExpire) {
if (broker.isExpired(ref)) { if (broker.isExpired(ref)) {
LOG.debug("expiring from pagedInMessages: " + ref);
messageExpired(connectionContext, ref); messageExpired(connectionContext, ref);
} else { } else {
synchronized (pagedInMessages) { synchronized (pagedInMessages) {
@ -842,23 +844,25 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} }
} }
if (l.size() < getMaxBrowsePageSize()) { if (browseList.size() < getMaxBrowsePageSize()) {
synchronized (messages) { synchronized (messages) {
try { try {
messages.reset(); messages.reset();
while (messages.hasNext() && l.size() < max) { while (messages.hasNext() && browseList.size() < max) {
MessageReference node = messages.next(); MessageReference node = messages.next();
if (node.isExpired()) { if (node.isExpired()) {
if (broker.isExpired(node)) { if (broker.isExpired(node)) {
LOG.debug("expiring from messages: " + node);
messageExpired(connectionContext, createMessageReference(node.getMessage())); messageExpired(connectionContext, createMessageReference(node.getMessage()));
} }
messages.remove(); messages.remove();
} else { } else {
messages.rollback(node.getMessageId()); messages.rollback(node.getMessageId());
if (l.contains(node.getMessage()) == false) { if (browseList.contains(node.getMessage()) == false) {
l.add(node.getMessage()); browseList.add(node.getMessage());
} }
} }
node.decrementReferenceCount();
} }
} finally { } finally {
messages.release(); messages.release();
@ -897,6 +901,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
while (messages.hasNext()) { while (messages.hasNext()) {
try { try {
MessageReference r = messages.next(); MessageReference r = messages.next();
r.decrementReferenceCount();
messages.rollback(r.getMessageId()); messages.rollback(r.getMessageId());
if (msgId.equals(r.getMessageId())) { if (msgId.equals(r.getMessageId())) {
Message m = r.getMessage(); Message m = r.getMessage();
@ -1444,12 +1449,13 @@ public class Queue extends BaseDestination implements Task, UsageListener {
messages.reset(); messages.reset();
while (messages.hasNext() && count < toPageIn) { while (messages.hasNext() && count < toPageIn) {
MessageReference node = messages.next(); MessageReference node = messages.next();
node.incrementReferenceCount();
messages.remove(); messages.remove();
QueueMessageReference ref = createMessageReference(node.getMessage()); QueueMessageReference ref = createMessageReference(node.getMessage());
if (ref.isExpired()) { if (ref.isExpired()) {
if (broker.isExpired(ref)) { if (broker.isExpired(ref)) {
messageExpired(createConnectionContext(), ref); messageExpired(createConnectionContext(), ref);
} else {
ref.decrementReferenceCount();
} }
} else { } else {
result.add(ref); result.add(ref);
@ -1467,6 +1473,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
if (!pagedInMessages.containsKey(ref.getMessageId())) { if (!pagedInMessages.containsKey(ref.getMessageId())) {
pagedInMessages.put(ref.getMessageId(), ref); pagedInMessages.put(ref.getMessageId(), ref);
resultList.add(ref); resultList.add(ref);
} else {
ref.decrementReferenceCount();
} }
} }
} }
@ -1657,7 +1665,6 @@ public class Queue extends BaseDestination implements Task, UsageListener {
messages.reset(); messages.reset();
while (messages.hasNext()) { while (messages.hasNext()) {
MessageReference node = messages.next(); MessageReference node = messages.next();
node.incrementReferenceCount();
messages.remove(); messages.remove();
if (messageId.equals(node.getMessageId())) { if (messageId.equals(node.getMessageId())) {
message = this.createMessageReference(node.getMessage()); message = this.createMessageReference(node.getMessage());

View File

@ -161,6 +161,7 @@ public class TopicSubscription extends AbstractSubscription {
matched.reset(); matched.reset();
while (matched.hasNext()) { while (matched.hasNext()) {
MessageReference node = matched.next(); MessageReference node = matched.next();
node.decrementReferenceCount();
if (broker.isExpired(node)) { if (broker.isExpired(node)) {
matched.remove(); matched.remove();
dispatchedCounter.incrementAndGet(); dispatchedCounter.incrementAndGet();
@ -181,6 +182,7 @@ public class TopicSubscription extends AbstractSubscription {
matched.reset(); matched.reset();
while (matched.hasNext()) { while (matched.hasNext()) {
MessageReference node = matched.next(); MessageReference node = matched.next();
node.decrementReferenceCount();
if (node.getMessageId().equals(mdn.getMessageId())) { if (node.getMessageId().equals(mdn.getMessageId())) {
matched.remove(); matched.remove();
dispatchedCounter.incrementAndGet(); dispatchedCounter.incrementAndGet();
@ -384,8 +386,8 @@ public class TopicSubscription extends AbstractSubscription {
matched.reset(); matched.reset();
while (matched.hasNext() && !isFull()) { while (matched.hasNext() && !isFull()) {
MessageReference message = (MessageReference) matched MessageReference message = (MessageReference) matched.next();
.next(); message.decrementReferenceCount();
matched.remove(); matched.remove();
// Message may have been sitting in the matched list a // Message may have been sitting in the matched list a
// while // while

View File

@ -151,6 +151,9 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
result = this.iterator.next().getValue(); result = this.iterator.next().getValue();
} }
last = result; last = result;
if (result != null) {
result.incrementReferenceCount();
}
return result; return result;
} }

View File

@ -162,6 +162,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
int count = 0; int count = 0;
for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) { for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) {
MessageReference ref = i.next(); MessageReference ref = i.next();
ref.incrementReferenceCount();
result.add(ref); result.add(ref);
count++; count++;
} }
@ -282,8 +283,8 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
// got from disk // got from disk
message.setRegionDestination(regionDestination); message.setRegionDestination(regionDestination);
message.setMemoryUsage(this.getSystemUsage().getMemoryUsage()); message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
message.incrementReferenceCount();
} }
message.incrementReferenceCount();
return message; return message;
} }

View File

@ -108,7 +108,7 @@ public interface PendingMessageCursor extends Service {
boolean hasNext(); boolean hasNext();
/** /**
* @return the next pending message * @return the next pending message with its reference count increment
*/ */
MessageReference next(); MessageReference next();

View File

@ -117,6 +117,9 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
*/ */
public synchronized MessageReference next() { public synchronized MessageReference next() {
last = (MessageReference)iter.next(); last = (MessageReference)iter.next();
if (last != null) {
last.incrementReferenceCount();
}
return last; return last;
} }

View File

@ -407,7 +407,7 @@ public abstract class Usage<T extends Usage> implements Service {
} }
static { static {
executor = new ThreadPoolExecutor(10, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) { public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "Usage Async Task"); Thread thread = new Thread(runnable, "Usage Async Task");
thread.setDaemon(true); thread.setDaemon(true);

View File

@ -26,13 +26,14 @@ public class ThreadTracker {
* track the stack trace of callers * track the stack trace of callers
* @param name the method being tracked * @param name the method being tracked
*/ */
public static void track(String name) { public static void track(final String name) {
Tracker t; Tracker t;
final String key = name.intern();
synchronized(trackers) { synchronized(trackers) {
t = trackers.get(name); t = trackers.get(key);
if (t == null) { if (t == null) {
t = new Tracker(); t = new Tracker();
trackers.put(name, t); trackers.put(key, t);
} }
} }
t.track(); t.track();
@ -56,23 +57,30 @@ public class ThreadTracker {
@SuppressWarnings("serial") @SuppressWarnings("serial")
class Trace extends Throwable { class Trace extends Throwable {
public int count = 1; public int count = 1;
public final int size; public final long id;
Trace() { Trace() {
super(); super();
size = this.getStackTrace().length; id = calculateIdentifier();
}
private long calculateIdentifier() {
int len = 0;
for (int i=0; i<this.getStackTrace().length; i++) {
len += this.getStackTrace()[i].toString().intern().hashCode();
}
return len;
} }
} }
@SuppressWarnings("serial") @SuppressWarnings("serial")
class Tracker extends HashMap<Integer, Trace> { class Tracker extends HashMap<Long, Trace> {
public void track() { public void track() {
Trace current = new Trace(); Trace current = new Trace();
synchronized(this) { synchronized(this) {
Trace exist = get(current.size); Trace exist = get(current.id);
if (exist != null) { if (exist != null) {
exist.count++; exist.count++;
} else { } else {
put(current.size, current); put(current.id, current);
} }
} }
} }

View File

@ -17,6 +17,7 @@
package org.apache.activemq; package org.apache.activemq;
import java.io.File; import java.io.File;
import java.util.Map;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.Destination; import javax.jms.Destination;
@ -25,6 +26,11 @@ import javax.jms.Message;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
@ -139,4 +145,32 @@ public class TestSupport extends TestCase {
recursiveDelete(new File(System.getProperty("derby.system.home"))); recursiveDelete(new File(System.getProperty("derby.system.home")));
} }
} }
public static DestinationStatistics getDestinationStatistics(BrokerService broker, ActiveMQDestination destination) {
DestinationStatistics result = null;
org.apache.activemq.broker.region.Destination dest = getDestination(broker, destination);
if (dest != null) {
result = dest.getDestinationStatistics();
}
return result;
}
public static org.apache.activemq.broker.region.Destination getDestination(BrokerService target, ActiveMQDestination destination) {
org.apache.activemq.broker.region.Destination result = null;
for (org.apache.activemq.broker.region.Destination dest : getDestinationMap(target, destination).values()) {
if (dest.getName().equals(destination.getPhysicalName())) {
result = dest;
break;
}
}
return result;
}
private static Map<ActiveMQDestination, org.apache.activemq.broker.region.Destination> getDestinationMap(BrokerService target,
ActiveMQDestination destination) {
RegionBroker regionBroker = (RegionBroker) target.getRegionBroker();
return destination.isQueue() ?
regionBroker.getQueueRegion().getDestinationMap() :
regionBroker.getTopicRegion().getDestinationMap();
}
} }

View File

@ -16,6 +16,12 @@
*/ */
package org.apache.activemq.broker; package org.apache.activemq.broker;
import javax.jms.JMSException;
import org.apache.activemq.TestSupport;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.ThreadTracker;
public class TopicSubscriptionTest extends QueueSubscriptionTest { public class TopicSubscriptionTest extends QueueSubscriptionTest {
protected void setUp() throws Exception { protected void setUp() throws Exception {
@ -24,6 +30,11 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
topic = true; topic = true;
} }
protected void tearDown() throws Exception {
super.tearDown();
ThreadTracker.result();
}
public void testManyProducersManyConsumers() throws Exception { public void testManyProducersManyConsumers() throws Exception {
consumerCount = 40; consumerCount = 40;
producerCount = 20; producerCount = 20;
@ -34,6 +45,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
doMultipleClientsTest(); doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * producerCount * consumerCount); assertTotalMessagesReceived(messageCount * producerCount * consumerCount);
assertDestinationMemoryUsageGoesToZero();
} }
public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception { public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
@ -46,6 +58,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
doMultipleClientsTest(); doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * consumerCount * producerCount); assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
assertDestinationMemoryUsageGoesToZero();
} }
public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception { public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
@ -58,6 +71,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
doMultipleClientsTest(); doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * consumerCount * producerCount); assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
assertDestinationMemoryUsageGoesToZero();
} }
public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception { public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception {
@ -82,6 +96,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
doMultipleClientsTest(); doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * consumerCount * producerCount); assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
assertDestinationMemoryUsageGoesToZero();
} }
public void testOneProducerManyConsumersFewMessages() throws Exception { public void testOneProducerManyConsumersFewMessages() throws Exception {
@ -94,6 +109,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
doMultipleClientsTest(); doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * consumerCount * producerCount); assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
assertDestinationMemoryUsageGoesToZero();
} }
public void testOneProducerManyConsumersManyMessages() throws Exception { public void testOneProducerManyConsumersManyMessages() throws Exception {
@ -106,6 +122,7 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
doMultipleClientsTest(); doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * consumerCount * producerCount); assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
assertDestinationMemoryUsageGoesToZero();
} }
@ -119,5 +136,12 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
doMultipleClientsTest(); doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * producerCount * consumerCount); assertTotalMessagesReceived(messageCount * producerCount * consumerCount);
assertDestinationMemoryUsageGoesToZero();
} }
private void assertDestinationMemoryUsageGoesToZero() throws Exception {
assertEquals("destination memory is back to 0", 0,
TestSupport.getDestination(broker, ActiveMQDestination.transform(destination)).getMemoryUsage().getPercentUsage());
}
} }

View File

@ -24,35 +24,38 @@ package org.apache.activemq.broker.region.cursors;
import java.util.Date; import java.util.Date;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode; import javax.jms.DeliveryMode;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Queue; import javax.jms.Queue;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.SystemUsage;
import junit.framework.TestCase;
public class StoreBasedCursorTest extends TestCase { public class StoreBasedCursorTest extends TestCase {
protected String bindAddress = "tcp://localhost:60706"; protected String bindAddress = "tcp://localhost:60706";
BrokerService broker; BrokerService broker;
ConnectionFactory factory; ActiveMQConnectionFactory factory;
Connection connection; Connection connection;
Session session; Session session;
Queue queue; Queue queue;
int messageSize = 1024; int messageSize = 1024;
int memoryLimit = 5 * messageSize; // actual message is messageSize*2, and 4*MessageSize would allow 2 messages be delivered, but the flush of the cache is async so the flush
// triggered on 2nd message maxing out the usage may not be in effect for the 3rd message to succeed. Making the memory usage more lenient
// gives the usageChange listener in the cursor an opportunity to kick in.
int memoryLimit = 12 * messageSize;
protected void setUp() throws Exception { protected void setUp() throws Exception {
super.setUp(); super.setUp();
if (broker == null) { if (broker == null) {
broker = new BrokerService(); broker = new BrokerService();
broker.setAdvisorySupport(false);
} }
} }
@ -67,6 +70,7 @@ public class StoreBasedCursorTest extends TestCase {
protected void start() throws Exception { protected void start() throws Exception {
broker.start(); broker.start();
factory = new ActiveMQConnectionFactory("vm://localhost?jms.alwaysSyncSend=true"); factory = new ActiveMQConnectionFactory("vm://localhost?jms.alwaysSyncSend=true");
factory.setWatchTopicAdvisories(false);
connection = factory.createConnection(); connection = factory.createConnection();
connection.start(); connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

View File

@ -99,6 +99,7 @@ public class StoreQueueCursorNoDuplicateTest extends TestCase {
underTest.reset(); underTest.reset();
while (underTest.hasNext() && dequeueCount < count) { while (underTest.hasNext() && dequeueCount < count) {
MessageReference ref = underTest.next(); MessageReference ref = underTest.next();
ref.decrementReferenceCount();
underTest.remove(); underTest.remove();
assertEquals(dequeueCount++, ref.getMessageId() assertEquals(dequeueCount++, ref.getMessageId()
.getProducerSequenceId()); .getProducerSequenceId());

View File

@ -43,6 +43,9 @@ import org.apache.activemq.store.amq.AMQPersistenceAdapter;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import static org.apache.activemq.TestSupport.getDestination;
import static org.apache.activemq.TestSupport.getDestinationStatistics;
public class ExpiredMessagesTest extends CombinationTestSupport { public class ExpiredMessagesTest extends CombinationTestSupport {
@ -124,7 +127,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
producingThread.join(); producingThread.join();
session.close(); session.close();
final DestinationStatistics view = this.getDestinationStatistics(destination); final DestinationStatistics view = getDestinationStatistics(broker, destination);
// wait for all to inflight to expire // wait for all to inflight to expire
assertTrue("all inflight messages expired ", Wait.waitFor(new Wait.Condition() { assertTrue("all inflight messages expired ", Wait.waitFor(new Wait.Condition() {
@ -165,7 +168,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
final long expiredBeforeEnqueue = numMessagesToSend - view.getEnqueues().getCount(); final long expiredBeforeEnqueue = numMessagesToSend - view.getEnqueues().getCount();
final long totalExpiredCount = view.getExpired().getCount() + expiredBeforeEnqueue; final long totalExpiredCount = view.getExpired().getCount() + expiredBeforeEnqueue;
final DestinationStatistics dlqView = getDestinationStatistics(dlqDestination); final DestinationStatistics dlqView = getDestinationStatistics(broker, dlqDestination);
LOG.info("DLQ stats: size= " + dlqView.getMessages().getCount() + ", enqueues: " + dlqView.getDequeues().getCount() + ", dequeues: " + dlqView.getDequeues().getCount() LOG.info("DLQ stats: size= " + dlqView.getMessages().getCount() + ", enqueues: " + dlqView.getDequeues().getCount() + ", dequeues: " + dlqView.getDequeues().getCount()
+ ", dispatched: " + dlqView.getDispatched().getCount() + ", inflight: " + dlqView.getInflight().getCount() + ", expiries: " + dlqView.getExpired().getCount()); + ", dispatched: " + dlqView.getDispatched().getCount() + ", inflight: " + dlqView.getInflight().getCount() + ", expiries: " + dlqView.getExpired().getCount());
@ -177,8 +180,8 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
assertEquals("dlq contains all expired", totalExpiredCount, dlqView.getMessages().getCount()); assertEquals("dlq contains all expired", totalExpiredCount, dlqView.getMessages().getCount());
// memory check // memory check
assertEquals("memory usage is back to duck egg", 0, this.getDestination(destination).getMemoryUsage().getPercentUsage()); assertEquals("memory usage is back to duck egg", 0, getDestination(broker, destination).getMemoryUsage().getPercentUsage());
assertTrue("memory usage is increased ", 0 < this.getDestination(dlqDestination).getMemoryUsage().getPercentUsage()); assertTrue("memory usage is increased ", 0 < getDestination(broker, dlqDestination).getMemoryUsage().getPercentUsage());
// verify DLQ // verify DLQ
MessageConsumer dlqConsumer = createDlqConsumer(connection); MessageConsumer dlqConsumer = createDlqConsumer(connection);
@ -243,7 +246,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
producingThread.start(); producingThread.start();
producingThread.join(); producingThread.join();
DestinationStatistics view = getDestinationStatistics(destination); DestinationStatistics view = getDestinationStatistics(broker, destination);
LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: " LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: "
+ view.getEnqueues().getCount() + ", dequeues: " + view.getEnqueues().getCount() + ", dequeues: "
+ view.getDequeues().getCount() + ", dispatched: " + view.getDequeues().getCount() + ", dispatched: "
@ -263,7 +266,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
Wait.waitFor(new Wait.Condition() { Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
DestinationStatistics view = getDestinationStatistics(destination); DestinationStatistics view = getDestinationStatistics(broker, destination);
LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: " LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: "
+ view.getEnqueues().getCount() + ", dequeues: " + view.getEnqueues().getCount() + ", dequeues: "
+ view.getDequeues().getCount() + ", dispatched: " + view.getDequeues().getCount() + ", dispatched: "
@ -275,7 +278,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
} }
}); });
view = getDestinationStatistics(destination); view = getDestinationStatistics(broker, destination);
assertEquals("Expect empty queue, QueueSize: ", 0, view.getMessages().getCount()); assertEquals("Expect empty queue, QueueSize: ", 0, view.getMessages().getCount());
assertEquals("all dequeues were expired", view.getDequeues().getCount(), view.getExpired().getCount()); assertEquals("all dequeues were expired", view.getDequeues().getCount(), view.getExpired().getCount());
} }
@ -305,26 +308,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
return broker; return broker;
} }
private DestinationStatistics getDestinationStatistics(ActiveMQDestination destination) {
DestinationStatistics result = null;
org.apache.activemq.broker.region.Destination dest = getDestination(destination);
if (dest != null) {
result = dest.getDestinationStatistics();
}
return result;
}
private org.apache.activemq.broker.region.Destination getDestination(ActiveMQDestination destination) {
org.apache.activemq.broker.region.Destination result = null;
RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
for (org.apache.activemq.broker.region.Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) {
if (dest.getName().equals(destination.getPhysicalName())) {
result = dest;
break;
}
}
return result;
}
protected void tearDown() throws Exception { protected void tearDown() throws Exception {
connection.stop(); connection.stop();

View File

@ -33,8 +33,11 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.DestinationViewMBean; import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
@ -53,6 +56,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
MessageProducer producer; MessageProducer producer;
public ActiveMQDestination destination = new ActiveMQQueue("test"); public ActiveMQDestination destination = new ActiveMQQueue("test");
public boolean optimizedDispatch = true; public boolean optimizedDispatch = true;
public PendingQueueMessageStoragePolicy pendingQueuePolicy;
public static Test suite() { public static Test suite() {
return suite(ExpiredMessagesWithNoConsumerTest.class); return suite(ExpiredMessagesWithNoConsumerTest.class);
@ -83,6 +87,8 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
defaultEntry.setExpireMessagesPeriod(800); defaultEntry.setExpireMessagesPeriod(800);
defaultEntry.setMaxExpirePageSize(800); defaultEntry.setMaxExpirePageSize(800);
defaultEntry.setPendingQueuePolicy(pendingQueuePolicy);
if (memoryLimit) { if (memoryLimit) {
// so memory is not consumed by DLQ turn if off // so memory is not consumed by DLQ turn if off
defaultEntry.setDeadLetterStrategy(null); defaultEntry.setDeadLetterStrategy(null);
@ -99,6 +105,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
public void initCombosForTestExpiredMessagesWithNoConsumer() { public void initCombosForTestExpiredMessagesWithNoConsumer() {
addCombinationValues("optimizedDispatch", new Object[] {Boolean.TRUE, Boolean.FALSE}); addCombinationValues("optimizedDispatch", new Object[] {Boolean.TRUE, Boolean.FALSE});
addCombinationValues("pendingQueuePolicy", new Object[] {null, new VMPendingQueueMessageStoragePolicy(), new FilePendingQueueMessageStoragePolicy()});
} }
public void testExpiredMessagesWithNoConsumer() throws Exception { public void testExpiredMessagesWithNoConsumer() throws Exception {
@ -154,6 +161,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
+ ", size= " + view.getQueueSize()); + ", size= " + view.getQueueSize());
assertEquals("All sent have expired", sendCount, view.getExpiredCount()); assertEquals("All sent have expired", sendCount, view.getExpiredCount());
assertEquals("memory usage goes to duck egg", 0, view.getMemoryPercentUsage());
} }
// first ack delivered after expiry // first ack delivered after expiry

View File

@ -0,0 +1,122 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ResourceAllocationException;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.junit.Before;
import org.junit.Test;
public class UnlimitedEnqueueTest {
BrokerService brokerService = null;
final long numMessages = 50000;
final long numThreads = 10;
@Test
public void testEnqueueIsOnlyLimitedByDisk() throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i=0; i<numThreads; i++) {
executor.execute(new Producer(numMessages/numThreads));
}
executor.shutdown();
executor.awaitTermination(30*60, TimeUnit.SECONDS);
}
@Before
public void createBrokerService() throws Exception {
brokerService = new BrokerService();
brokerService.setDeleteAllMessagesOnStartup(true);
brokerService.setAdvisorySupport(false);
// optional, reduce the usage limit so that spooling will occur faster
brokerService.getSystemUsage().getMemoryUsage().setLimit(10 * 1024 * 1024);
PolicyMap policyMap = new PolicyMap();
List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
PolicyEntry policy = new PolicyEntry();
// NB: ensure queue cursor limit is below the default 70% usage that the destination will use
// if they are the same, the queue memory limit and flow control will kick in first
policy.setCursorMemoryHighWaterMark(20);
// on by default
//policy.setProducerFlowControl(true);
policy.setQueue(">");
// policy that will spool references to disk
policy.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy());
entries.add(policy);
policyMap.setPolicyEntries(entries);
brokerService.setDestinationPolicy(policyMap);
brokerService.start();
}
public class Producer implements Runnable{
private final long numberOfMessages;
public Producer(final long n){
this.numberOfMessages = n;
}
public void run(){
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerService.getVmConnectorURI());
try {
Connection conn = factory.createConnection();
conn.start();
for (int i = 0; i < numberOfMessages; i++) {
Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test-queue");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
BytesMessage message = session.createBytesMessage();
byte[] bytes = new byte[1024*10];
message.writeBytes(bytes);
try {
producer.send(message);
} catch (ResourceAllocationException e) {
e.printStackTrace();
}
session.close();
}
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
}
}