Fixed contains method in PrioritizedPendinList which was not returning
correctly.  This was causing messages to not be removed from the
dispatchPendingList when purge was called inside a Queue leading to an
eventual OOM error if enough messages were purged. This fix also
improves performance of the contains method.
This commit is contained in:
Christopher L. Shannon (cshannon) 2015-12-01 19:33:53 +00:00
parent 5a27bdf07e
commit 8363c99b51
2 changed files with 32 additions and 11 deletions

View File

@ -156,10 +156,9 @@ public class PrioritizedPendingList implements PendingList {
@Override
public boolean contains(MessageReference message) {
if (map.values().contains(message)) {
return true;
if (message != null) {
return this.map.containsKey(message.getMessageId());
}
return false;
}

View File

@ -30,6 +30,7 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
@ -57,6 +58,7 @@ public class QueuePurgeTest extends CombinationTestSupport {
Queue queue;
MessageConsumer consumer;
@Override
protected void setUp() throws Exception {
setMaxTestTime(10*60*1000); // 10 mins
setAutoFail(true);
@ -78,6 +80,7 @@ public class QueuePurgeTest extends CombinationTestSupport {
connection.start();
}
@Override
protected void tearDown() throws Exception {
super.tearDown();
if (consumer != null) {
@ -90,7 +93,15 @@ public class QueuePurgeTest extends CombinationTestSupport {
}
public void testPurgeLargeQueue() throws Exception {
applyBrokerSpoolingPolicy();
testPurgeLargeQueue(false);
}
public void testPurgeLargeQueuePrioritizedMessages() throws Exception {
testPurgeLargeQueue(true);
}
private void testPurgeLargeQueue(boolean prioritizedMessages) throws Exception {
applyBrokerSpoolingPolicy(prioritizedMessages);
createProducerAndSendMessages(NUM_TO_SEND);
QueueViewMBean proxy = getProxyToQueueViewMBean();
LOG.info("purging..");
@ -127,10 +138,11 @@ public class QueuePurgeTest extends CombinationTestSupport {
proxy.getQueueSize());
assertTrue("cache is disabled, temp store being used", !proxy.isCacheEnabled());
assertTrue("got expected info purge log message", gotPurgeLogMessage.get());
assertEquals("Found messages when browsing", 0, proxy.browseMessages().size());
}
public void testRepeatedExpiryProcessingOfLargeQueue() throws Exception {
applyBrokerSpoolingPolicy();
public void testRepeatedExpiryProcessingOfLargeQueue() throws Exception {
applyBrokerSpoolingPolicy(false);
final int expiryPeriod = 500;
applyExpiryDuration(expiryPeriod);
createProducerAndSendMessages(NUM_TO_SEND);
@ -140,15 +152,16 @@ public class QueuePurgeTest extends CombinationTestSupport {
assertEquals("Queue size is has not changed " + proxy.getQueueSize(), NUM_TO_SEND,
proxy.getQueueSize());
}
private void applyExpiryDuration(int i) {
broker.getDestinationPolicy().getDefaultEntry().setExpireMessagesPeriod(i);
}
private void applyBrokerSpoolingPolicy() {
private void applyBrokerSpoolingPolicy(boolean prioritizedMessages) {
PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
defaultEntry.setPrioritizedMessages(prioritizedMessages);
defaultEntry.setProducerFlowControl(false);
PendingQueueMessageStoragePolicy pendingQueuePolicy = new FilePendingQueueMessageStoragePolicy();
defaultEntry.setPendingQueuePolicy(pendingQueuePolicy);
@ -156,9 +169,17 @@ public class QueuePurgeTest extends CombinationTestSupport {
broker.setDestinationPolicy(policyMap);
}
public void testPurgeLargeQueueWithConsumer() throws Exception {
applyBrokerSpoolingPolicy();
public void testPurgeLargeQueueWithConsumer() throws Exception {
testPurgeLargeQueueWithConsumer(false);
}
public void testPurgeLargeQueueWithConsumerPrioritizedMessages() throws Exception {
testPurgeLargeQueueWithConsumer(true);
}
private void testPurgeLargeQueueWithConsumer(boolean prioritizedMessages) throws Exception {
applyBrokerSpoolingPolicy(prioritizedMessages);
createProducerAndSendMessages(NUM_TO_SEND);
QueueViewMBean proxy = getProxyToQueueViewMBean();
createConsumer();
@ -177,6 +198,7 @@ public class QueuePurgeTest extends CombinationTestSupport {
}
} while (msg != null);
assertEquals("Queue size not valid", 0, proxy.getQueueSize());
assertEquals("Found messages when browsing", 0, proxy.browseMessages().size());
}
private QueueViewMBean getProxyToQueueViewMBean()