Depending on the test configuration parameters, it was possible to get an OutOfMemory error. The causes were:

- The MessageList was holding on to all the messages being consumed, changed this so that it only holds on to the messageIds
 - Was using a non persistent broker, but was sending it persistent messages, in the topic case, he holds on to the messages in a memory based message store.  By default we now send non persistent messages.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@360195 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2005-12-30 23:25:03 +00:00
parent 022cd57b30
commit 3d95025a47
8 changed files with 79 additions and 88 deletions

View File

@ -19,7 +19,7 @@ package org.apache.activemq;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.MessageList;
import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerFactory;
@ -53,11 +53,12 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
protected boolean useConcurrentSend = true;
protected boolean durable = false;
protected boolean topic = false;
protected boolean persistent = false;
protected BrokerService broker;
protected Destination destination;
protected List connections = Collections.synchronizedList(new ArrayList());
protected MessageList allMessagesList = new MessageList();
protected MessageIdList allMessagesList = new MessageIdList();
protected void startProducers(Destination dest, int msgCount) throws Exception {
startProducers(createConnectionFactory(), dest, msgCount);
@ -108,6 +109,7 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
for (int i = 0; i < count; i++) {
TextMessage msg = createTextMessage(session, "" + i);
@ -149,7 +151,7 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
} else {
consumer = createMessageConsumer(factory.createConnection(), dest);
}
MessageList list = new MessageList();
MessageIdList list = new MessageIdList();
list.setParent(allMessagesList);
consumer.setMessageListener(list);
consumers.put(consumer, list);
@ -222,18 +224,18 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
* Some helpful assertions for multiple consumers.
*/
protected void assertConsumerReceivedAtLeastXMessages(MessageConsumer consumer, int msgCount) {
MessageList messageList = (MessageList)consumers.get(consumer);
messageList.assertAtLeastMessagesReceived(msgCount);
MessageIdList messageIdList = (MessageIdList)consumers.get(consumer);
messageIdList.assertAtLeastMessagesReceived(msgCount);
}
protected void assertConsumerReceivedAtMostXMessages(MessageConsumer consumer, int msgCount) {
MessageList messageList = (MessageList)consumers.get(consumer);
messageList.assertAtMostMessagesReceived(msgCount);
MessageIdList messageIdList = (MessageIdList)consumers.get(consumer);
messageIdList.assertAtMostMessagesReceived(msgCount);
}
protected void assertConsumerReceivedXMessages(MessageConsumer consumer, int msgCount) {
MessageList messageList = (MessageList)consumers.get(consumer);
messageList.assertMessagesReceivedNoWait(msgCount);
MessageIdList messageIdList = (MessageIdList)consumers.get(consumer);
messageIdList.assertMessagesReceivedNoWait(msgCount);
}
protected void assertEachConsumerReceivedAtLeastXMessages(int msgCount) {
@ -260,8 +262,8 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
// now lets count the individual messages received
int totalMsg = 0;
for (Iterator i=consumers.keySet().iterator(); i.hasNext();) {
MessageList messageList = (MessageList)consumers.get(i.next());
totalMsg += messageList.getMessageCount();
MessageIdList messageIdList = (MessageIdList)consumers.get(i.next());
totalMsg += messageIdList.getMessageCount();
}
assertEquals("Total of consumers message count", msgCount, totalMsg);
}

View File

@ -24,6 +24,18 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
topic = true;
}
public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
consumerCount = 2;
producerCount = 1;
messageCount = 100;
messageSize = 1024 * 1024 * 1; // 1 MB
prefetchCount = 1;
doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
}
public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
consumerCount = 2;
producerCount = 1;
@ -48,18 +60,6 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
}
public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
consumerCount = 2;
producerCount = 1;
messageCount = 10;
messageSize = 1024 * 1024 * 1; // 1 MB
prefetchCount = 1;
doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
}
public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception {
consumerCount = 2;
producerCount = 1;

View File

@ -21,7 +21,7 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
import org.apache.activemq.util.MessageList;
import org.apache.activemq.util.MessageIdList;
import java.util.Iterator;
import java.util.List;
@ -59,8 +59,8 @@ public class SimpleDispatchPolicyTest extends QueueSubscriptionTest {
public void assertOneConsumerReceivedAllMessages(int messageCount) throws Exception {
boolean found = false;
for (Iterator i=consumers.keySet().iterator(); i.hasNext();) {
MessageList messageList = (MessageList)consumers.get(i.next());
int count = messageList.getMessageCount();
MessageIdList messageIdList = (MessageIdList)consumers.get(i.next());
int count = messageIdList.getMessageCount();
if (count > 0) {
if (found) {
fail("No other consumers should have received any messages");

View File

@ -21,7 +21,7 @@ import org.apache.activemq.broker.TopicSubscriptionTest;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.StrictOrderDispatchPolicy;
import org.apache.activemq.util.MessageList;
import org.apache.activemq.util.MessageIdList;
import java.util.List;
import java.util.Iterator;
@ -42,6 +42,12 @@ public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
return broker;
}
public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
super.testOneProducerTwoConsumersLargeMessagesOnePrefetch();
assertReceivedMessagesAreOrdered();
}
public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
super.testOneProducerTwoConsumersSmallMessagesOnePrefetch();
@ -54,12 +60,6 @@ public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
assertReceivedMessagesAreOrdered();
}
public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
super.testOneProducerTwoConsumersLargeMessagesOnePrefetch();
assertReceivedMessagesAreOrdered();
}
public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception {
super.testOneProducerTwoConsumersLargeMessagesLargePrefetch();
@ -98,11 +98,11 @@ public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
// Get basis of order
Iterator i = consumers.keySet().iterator();
MessageList messageOrder = (MessageList)consumers.get(i.next());
MessageIdList messageOrder = (MessageIdList)consumers.get(i.next());
for (;i.hasNext();) {
MessageList messageList = (MessageList)consumers.get(i.next());
assertTrue("Messages are not ordered.", messageOrder.equals(messageList));
MessageIdList messageIdList = (MessageIdList)consumers.get(i.next());
assertTrue("Messages are not ordered.", messageOrder.equals(messageIdList));
}
}
}

View File

@ -19,7 +19,7 @@ package org.apache.activemq.test.retroactive;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.MessageList;
import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.springframework.core.io.ClassPathResource;
@ -60,7 +60,7 @@ public class RetroactiveConsumerTestWithSimpleMessageListTest extends EmbeddedBr
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination);
MessageList listener = new MessageList();
MessageIdList listener = new MessageIdList();
consumer.setMessageListener(listener);
listener.waitForMessagesToArrive(messageCount);
listener.assertMessagesReceived(messageCount);

View File

@ -19,7 +19,7 @@ package org.apache.activemq.test.retroactive;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.MessageList;
import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.springframework.core.io.ClassPathResource;
@ -49,7 +49,7 @@ public class RetroactiveConsumerWithMessageQueryTest extends EmbeddedBrokerTestS
connection.start();
MessageConsumer consumer = session.createConsumer(destination);
MessageList listener = new MessageList();
MessageIdList listener = new MessageIdList();
listener.setVerbose(true);
consumer.setMessageListener(listener);

View File

@ -20,7 +20,7 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.MessageList;
import org.apache.activemq.util.MessageIdList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -47,13 +47,13 @@ public class PeerTransportTest extends TestCase {
protected int deliveryMode = DeliveryMode.NON_PERSISTENT;
protected MessageProducer[] producers;
protected Connection[] connections;
protected MessageList messageList[];
protected MessageIdList messageIdList[];
protected void setUp() throws Exception {
connections = new Connection[NUMBER_IN_CLUSTER];
producers = new MessageProducer[NUMBER_IN_CLUSTER];
messageList = new MessageList[NUMBER_IN_CLUSTER];
messageIdList = new MessageIdList[NUMBER_IN_CLUSTER];
Destination destination = createDestination();
String root = System.getProperty("activemq.store.dir");
@ -67,8 +67,8 @@ public class PeerTransportTest extends TestCase {
producers[i] = session.createProducer(destination);
producers[i].setDeliveryMode(deliveryMode);
MessageConsumer consumer = createMessageConsumer(session, destination);
messageList[i] = new MessageList();
consumer.setMessageListener(messageList[i]);
messageIdList[i] = new MessageIdList();
consumer.setMessageListener(messageIdList[i]);
}
System.out.println("Sleeping to ensure cluster is fully connected");
Thread.sleep(10000);
@ -120,7 +120,7 @@ public class PeerTransportTest extends TestCase {
}
for (int i = 0;i < NUMBER_IN_CLUSTER;i++) {
messageList[i].assertMessagesReceived(expectedReceiveCount());
messageIdList[i].assertMessagesReceived(expectedReceiveCount());
}
}

View File

@ -16,13 +16,12 @@
*/
package org.apache.activemq.util;
import java.util.ArrayList;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import junit.framework.Assert;
@ -38,38 +37,38 @@ import junit.framework.Assert;
*
* @version $Revision: 1.6 $
*/
public class MessageList extends Assert implements MessageListener {
private List messages = new ArrayList();
public class MessageIdList extends Assert implements MessageListener {
private List messageIds = new ArrayList();
private Object semaphore;
private boolean verbose;
private MessageListener parent;
private long maximumDuration = 15000L;
public MessageList() {
public MessageIdList() {
this(new Object());
}
public MessageList(Object semaphore) {
public MessageIdList(Object semaphore) {
this.semaphore = semaphore;
}
public boolean equals(Object that) {
if (that instanceof MessageList) {
MessageList thatList = (MessageList) that;
return getMessages().equals(thatList.getMessages());
if (that instanceof MessageIdList) {
MessageIdList thatList = (MessageIdList) that;
return getMessageIds().equals(thatList.getMessageIds());
}
return false;
}
public int hashCode() {
synchronized (semaphore) {
return messages.hashCode() + 1;
return messageIds.hashCode() + 1;
}
}
public String toString() {
synchronized (semaphore) {
return messages.toString();
return messageIds.toString();
}
}
@ -78,31 +77,15 @@ public class MessageList extends Assert implements MessageListener {
*/
public List flushMessages() {
synchronized (semaphore) {
List answer = new ArrayList(messages);
messages.clear();
List answer = new ArrayList(messageIds);
messageIds.clear();
return answer;
}
}
public synchronized List getMessages() {
public synchronized List getMessageIds() {
synchronized (semaphore) {
return new ArrayList(messages);
}
}
public synchronized List getTextMessages() {
synchronized (semaphore) {
ArrayList l = new ArrayList();
for (Iterator iter = messages.iterator(); iter.hasNext();) {
try {
TextMessage m = (TextMessage) iter.next();
l.add(m.getText());
}
catch (Throwable e) {
l.add("" + e);
}
}
return l;
return new ArrayList(messageIds);
}
}
@ -110,18 +93,24 @@ public class MessageList extends Assert implements MessageListener {
if (parent != null) {
parent.onMessage(message);
}
String id=null;
try {
id = message.getJMSMessageID();
synchronized (semaphore) {
messages.add(message);
messageIds.add(id);
semaphore.notifyAll();
}
if (verbose) {
System.out.println("###Êreceived message: " + message);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
public int getMessageCount() {
synchronized (semaphore) {
return messages.size();
return messageIds.size();
}
}