mirror of https://github.com/apache/activemq.git
refactored the test case to use MessageList to avoid timing issues
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@359827 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
27f7cab3e8
commit
b49256b2e5
|
@ -19,6 +19,7 @@ package org.apache.activemq;
|
|||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.util.MessageList;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.BrokerFactory;
|
||||
|
||||
|
@ -56,6 +57,7 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
|
|||
protected BrokerService broker;
|
||||
protected Destination destination;
|
||||
protected List connections = Collections.synchronizedList(new ArrayList());
|
||||
protected MessageList allMessagesList = new MessageList();
|
||||
|
||||
protected void startProducers(Destination dest, int msgCount) throws Exception {
|
||||
startProducers(createConnectionFactory(), dest, msgCount);
|
||||
|
@ -147,8 +149,10 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
|
|||
} else {
|
||||
consumer = createMessageConsumer(factory.createConnection(), dest);
|
||||
}
|
||||
// Add consumer object and message list
|
||||
consumers.put(consumer, new ArrayList());
|
||||
MessageList list = new MessageList();
|
||||
list.setParent(allMessagesList);
|
||||
consumer.setMessageListener(list);
|
||||
consumers.put(consumer, list);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -157,12 +161,6 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
|
|||
|
||||
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
final MessageConsumer consumer = sess.createConsumer(dest);
|
||||
consumer.setMessageListener(new MessageListener() {
|
||||
public void onMessage(Message message) {
|
||||
List messageList = (List)consumers.get(consumer);
|
||||
messageList.add(message);
|
||||
}
|
||||
});
|
||||
conn.start();
|
||||
|
||||
return consumer;
|
||||
|
@ -174,19 +172,13 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
|
|||
|
||||
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
final TopicSubscriber consumer = sess.createDurableSubscriber((javax.jms.Topic)dest, name);
|
||||
consumer.setMessageListener(new MessageListener() {
|
||||
public void onMessage(Message message) {
|
||||
List messageList = (List)consumers.get(consumer);
|
||||
messageList.add(message);
|
||||
}
|
||||
});
|
||||
conn.start();
|
||||
|
||||
return consumer;
|
||||
}
|
||||
|
||||
protected void waitForAllMessagesToBeReceived(int timeout) throws Exception {
|
||||
Thread.sleep(timeout);
|
||||
//Thread.sleep(timeout);
|
||||
}
|
||||
|
||||
protected ActiveMQDestination createDestination() throws JMSException {
|
||||
|
@ -230,18 +222,18 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
|
|||
* Some helpful assertions for multiple consumers.
|
||||
*/
|
||||
protected void assertConsumerReceivedAtLeastXMessages(MessageConsumer consumer, int msgCount) {
|
||||
List messageList = (List)consumers.get(consumer);
|
||||
assertTrue("Consumer received less than " + msgCount + " messages. Actual messages received is " + messageList.size(), (messageList.size() >= msgCount));
|
||||
MessageList messageList = (MessageList)consumers.get(consumer);
|
||||
messageList.assertAtLeastMessagesReceived(msgCount);
|
||||
}
|
||||
|
||||
protected void assertConsumerReceivedAtMostXMessages(MessageConsumer consumer, int msgCount) {
|
||||
List messageList = (List)consumers.get(consumer);
|
||||
assertTrue("Consumer received more than " + msgCount + " messages. Actual messages received is " + messageList.size(), (messageList.size() <= msgCount));
|
||||
MessageList messageList = (MessageList)consumers.get(consumer);
|
||||
messageList.assertAtMostMessagesReceived(msgCount);
|
||||
}
|
||||
|
||||
protected void assertConsumerReceivedXMessages(MessageConsumer consumer, int msgCount) {
|
||||
List messageList = (List)consumers.get(consumer);
|
||||
assertTrue("Consumer should have received exactly " + msgCount + " messages. Actual messages received is " + messageList.size(), (messageList.size() == msgCount));
|
||||
MessageList messageList = (MessageList)consumers.get(consumer);
|
||||
messageList.assertMessagesReceived(msgCount);
|
||||
}
|
||||
|
||||
protected void assertEachConsumerReceivedAtLeastXMessages(int msgCount) {
|
||||
|
@ -263,11 +255,14 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
|
|||
}
|
||||
|
||||
protected void assertTotalMessagesReceived(int msgCount) {
|
||||
allMessagesList.assertMessagesReceived(msgCount);
|
||||
|
||||
// now lets count the individual messages received
|
||||
int totalMsg = 0;
|
||||
for (Iterator i=consumers.keySet().iterator(); i.hasNext();) {
|
||||
totalMsg += ((List)consumers.get(i.next())).size();
|
||||
MessageList messageList = (MessageList)consumers.get(i.next());
|
||||
totalMsg += messageList.getMessageCount();
|
||||
}
|
||||
|
||||
assertTrue("Total messages received should have been " + msgCount + ". Actual messages received is " + totalMsg, (totalMsg == msgCount));
|
||||
assertEquals("Total of consumers message count", msgCount, totalMsg);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.apache.activemq.broker.BrokerService;
|
|||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
|
||||
import org.apache.activemq.util.MessageList;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -58,12 +59,13 @@ public class SimpleDispatchPolicyTest extends QueueSubscriptionTest {
|
|||
public void assertOneConsumerReceivedAllMessages(int messageCount) throws Exception {
|
||||
boolean found = false;
|
||||
for (Iterator i=consumers.keySet().iterator(); i.hasNext();) {
|
||||
List messageList = (List)consumers.get(i.next());
|
||||
if (messageList.size() > 0) {
|
||||
MessageList messageList = (MessageList)consumers.get(i.next());
|
||||
int count = messageList.getMessageCount();
|
||||
if (count > 0) {
|
||||
if (found) {
|
||||
fail("No other consumers should have received any messages");
|
||||
} else {
|
||||
assertTrue("Consumer should have received all " + messageCount + " messages. Actual messages received is " + messageList.size(), messageList.size()==messageCount);
|
||||
assertEquals("Consumer should have received all messages.", messageCount, count);
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ public class MessageList extends Assert implements MessageListener {
|
|||
private List messages = new ArrayList();
|
||||
private Object semaphore;
|
||||
private boolean verbose;
|
||||
private MessageListener parent;
|
||||
|
||||
public MessageList() {
|
||||
this(new Object());
|
||||
|
@ -77,6 +78,9 @@ public class MessageList extends Assert implements MessageListener {
|
|||
}
|
||||
|
||||
public void onMessage(Message message) {
|
||||
if (parent != null) {
|
||||
parent.onMessage(message);
|
||||
}
|
||||
synchronized (semaphore) {
|
||||
messages.add(message);
|
||||
semaphore.notifyAll();
|
||||
|
@ -127,6 +131,19 @@ public class MessageList extends Assert implements MessageListener {
|
|||
assertEquals("expected number of messages when received: " + getMessages(), messageCount, getMessageCount());
|
||||
}
|
||||
|
||||
public void assertAtLeastMessagesReceived(int messageCount) {
|
||||
waitForMessagesToArrive(messageCount);
|
||||
|
||||
int actual = getMessageCount();
|
||||
assertTrue("at least: " + messageCount + " messages received. Actual: " + actual, actual >= messageCount);
|
||||
}
|
||||
|
||||
public void assertAtMostMessagesReceived(int messageCount) {
|
||||
int actual = getMessageCount();
|
||||
assertTrue("at most: " + messageCount + " messages received. Actual: " + actual, actual <= messageCount);
|
||||
}
|
||||
|
||||
|
||||
public boolean hasReceivedMessage() {
|
||||
return getMessageCount() == 0;
|
||||
}
|
||||
|
@ -143,4 +160,16 @@ public class MessageList extends Assert implements MessageListener {
|
|||
this.verbose = verbose;
|
||||
}
|
||||
|
||||
public MessageListener getParent() {
|
||||
return parent;
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows a parent listener to be specified such as to aggregate messages consumed across consumers
|
||||
*/
|
||||
public void setParent(MessageListener parent) {
|
||||
this.parent = parent;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue