Fixed contains method in PrioritizedPendinList which was not returning
correctly.  This was causing messages to not be removed from the
dispatchPendingList when purge was called inside a Queue leading to an
eventual OOM error if enough messages were purged. This fix also
improves performance of the contains method.

(cherry picked from commit 8363c99b51)
This commit is contained in:
Christopher L. Shannon (cshannon) 2015-12-01 19:33:53 +00:00
parent 95fc593975
commit db87a051ca
2 changed files with 32 additions and 11 deletions

View File

@ -156,10 +156,9 @@ public class PrioritizedPendingList implements PendingList {
@Override @Override
public boolean contains(MessageReference message) { public boolean contains(MessageReference message) {
if (map.values().contains(message)) { if (message != null) {
return true; return this.map.containsKey(message.getMessageId());
} }
return false; return false;
} }

View File

@ -30,6 +30,7 @@ import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import javax.management.MalformedObjectNameException; import javax.management.MalformedObjectNameException;
import javax.management.ObjectName; import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
@ -57,6 +58,7 @@ public class QueuePurgeTest extends CombinationTestSupport {
Queue queue; Queue queue;
MessageConsumer consumer; MessageConsumer consumer;
@Override
protected void setUp() throws Exception { protected void setUp() throws Exception {
setMaxTestTime(10*60*1000); // 10 mins setMaxTestTime(10*60*1000); // 10 mins
setAutoFail(true); setAutoFail(true);
@ -78,6 +80,7 @@ public class QueuePurgeTest extends CombinationTestSupport {
connection.start(); connection.start();
} }
@Override
protected void tearDown() throws Exception { protected void tearDown() throws Exception {
super.tearDown(); super.tearDown();
if (consumer != null) { if (consumer != null) {
@ -90,7 +93,15 @@ public class QueuePurgeTest extends CombinationTestSupport {
} }
public void testPurgeLargeQueue() throws Exception { public void testPurgeLargeQueue() throws Exception {
applyBrokerSpoolingPolicy(); testPurgeLargeQueue(false);
}
public void testPurgeLargeQueuePrioritizedMessages() throws Exception {
testPurgeLargeQueue(true);
}
private void testPurgeLargeQueue(boolean prioritizedMessages) throws Exception {
applyBrokerSpoolingPolicy(prioritizedMessages);
createProducerAndSendMessages(NUM_TO_SEND); createProducerAndSendMessages(NUM_TO_SEND);
QueueViewMBean proxy = getProxyToQueueViewMBean(); QueueViewMBean proxy = getProxyToQueueViewMBean();
LOG.info("purging.."); LOG.info("purging..");
@ -127,10 +138,11 @@ public class QueuePurgeTest extends CombinationTestSupport {
proxy.getQueueSize()); proxy.getQueueSize());
assertTrue("cache is disabled, temp store being used", !proxy.isCacheEnabled()); assertTrue("cache is disabled, temp store being used", !proxy.isCacheEnabled());
assertTrue("got expected info purge log message", gotPurgeLogMessage.get()); assertTrue("got expected info purge log message", gotPurgeLogMessage.get());
assertEquals("Found messages when browsing", 0, proxy.browseMessages().size());
} }
public void testRepeatedExpiryProcessingOfLargeQueue() throws Exception { public void testRepeatedExpiryProcessingOfLargeQueue() throws Exception {
applyBrokerSpoolingPolicy(); applyBrokerSpoolingPolicy(false);
final int expiryPeriod = 500; final int expiryPeriod = 500;
applyExpiryDuration(expiryPeriod); applyExpiryDuration(expiryPeriod);
createProducerAndSendMessages(NUM_TO_SEND); createProducerAndSendMessages(NUM_TO_SEND);
@ -140,15 +152,16 @@ public class QueuePurgeTest extends CombinationTestSupport {
assertEquals("Queue size is has not changed " + proxy.getQueueSize(), NUM_TO_SEND, assertEquals("Queue size is has not changed " + proxy.getQueueSize(), NUM_TO_SEND,
proxy.getQueueSize()); proxy.getQueueSize());
} }
private void applyExpiryDuration(int i) { private void applyExpiryDuration(int i) {
broker.getDestinationPolicy().getDefaultEntry().setExpireMessagesPeriod(i); broker.getDestinationPolicy().getDefaultEntry().setExpireMessagesPeriod(i);
} }
private void applyBrokerSpoolingPolicy() { private void applyBrokerSpoolingPolicy(boolean prioritizedMessages) {
PolicyMap policyMap = new PolicyMap(); PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry(); PolicyEntry defaultEntry = new PolicyEntry();
defaultEntry.setPrioritizedMessages(prioritizedMessages);
defaultEntry.setProducerFlowControl(false); defaultEntry.setProducerFlowControl(false);
PendingQueueMessageStoragePolicy pendingQueuePolicy = new FilePendingQueueMessageStoragePolicy(); PendingQueueMessageStoragePolicy pendingQueuePolicy = new FilePendingQueueMessageStoragePolicy();
defaultEntry.setPendingQueuePolicy(pendingQueuePolicy); defaultEntry.setPendingQueuePolicy(pendingQueuePolicy);
@ -156,9 +169,17 @@ public class QueuePurgeTest extends CombinationTestSupport {
broker.setDestinationPolicy(policyMap); broker.setDestinationPolicy(policyMap);
} }
public void testPurgeLargeQueueWithConsumer() throws Exception { public void testPurgeLargeQueueWithConsumer() throws Exception {
applyBrokerSpoolingPolicy(); testPurgeLargeQueueWithConsumer(false);
}
public void testPurgeLargeQueueWithConsumerPrioritizedMessages() throws Exception {
testPurgeLargeQueueWithConsumer(true);
}
private void testPurgeLargeQueueWithConsumer(boolean prioritizedMessages) throws Exception {
applyBrokerSpoolingPolicy(prioritizedMessages);
createProducerAndSendMessages(NUM_TO_SEND); createProducerAndSendMessages(NUM_TO_SEND);
QueueViewMBean proxy = getProxyToQueueViewMBean(); QueueViewMBean proxy = getProxyToQueueViewMBean();
createConsumer(); createConsumer();
@ -177,6 +198,7 @@ public class QueuePurgeTest extends CombinationTestSupport {
} }
} while (msg != null); } while (msg != null);
assertEquals("Queue size not valid", 0, proxy.getQueueSize()); assertEquals("Queue size not valid", 0, proxy.getQueueSize());
assertEquals("Found messages when browsing", 0, proxy.browseMessages().size());
} }
private QueueViewMBean getProxyToQueueViewMBean() private QueueViewMBean getProxyToQueueViewMBean()