mirror of https://github.com/apache/activemq.git
- Added test support for multiple consumers and producers
- Added test cases for queue and topic subscriptions - Added test cases for the different dispatch policies git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@359769 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
13266c83c4
commit
ff457e227a
|
@ -0,0 +1,273 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005-2006 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.BrokerFactory;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.net.URI;
|
||||
|
||||
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import javax.jms.*;
|
||||
|
||||
/**
|
||||
* Test case support used to test multiple message comsumers and message producers connecting to a single broker.
|
||||
*
|
||||
* @version $Revision$
|
||||
*/
|
||||
public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
|
||||
private AtomicInteger producerLock;
|
||||
|
||||
protected Map consumers = new HashMap(); // Map of consumer with messages received
|
||||
protected int consumerCount = 1;
|
||||
protected int producerCount = 1;
|
||||
|
||||
protected int messageSize = 1024;
|
||||
|
||||
protected boolean useConcurrentSend = true;
|
||||
protected boolean durable = false;
|
||||
protected boolean topic = false;
|
||||
|
||||
protected BrokerService broker;
|
||||
protected Destination destination;
|
||||
protected List connections = Collections.synchronizedList(new ArrayList());
|
||||
|
||||
protected void startProducers(Destination dest, int msgCount) throws Exception {
|
||||
startProducers(createConnectionFactory(), dest, msgCount);
|
||||
}
|
||||
|
||||
protected void startProducers(final ConnectionFactory factory, final Destination dest, final int msgCount) throws Exception {
|
||||
// Use concurrent send
|
||||
if (useConcurrentSend) {
|
||||
producerLock = new AtomicInteger(producerCount);
|
||||
|
||||
for (int i=0; i<producerCount; i++) {
|
||||
Thread t = new Thread(new Runnable() {
|
||||
public void run() {
|
||||
try {
|
||||
sendMessages(factory.createConnection(), dest, msgCount);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
synchronized (producerLock) {
|
||||
producerLock.decrementAndGet();
|
||||
producerLock.notifyAll();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
t.start();
|
||||
}
|
||||
|
||||
// Wait for all producers to finish sending
|
||||
synchronized (producerLock) {
|
||||
while (producerLock.get() != 0) {
|
||||
producerLock.wait();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Use serialized send
|
||||
} else {
|
||||
for (int i=0; i<producerCount; i++) {
|
||||
sendMessages(factory.createConnection(), dest, msgCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void sendMessages(Connection connection, Destination destination, int count) throws Exception {
|
||||
connection.start();
|
||||
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
|
||||
for (int i = 0; i < count; i++) {
|
||||
TextMessage msg = createTextMessage(session, "" + i);
|
||||
producer.send(msg);
|
||||
}
|
||||
|
||||
producer.close();
|
||||
session.close();
|
||||
connection.close();
|
||||
}
|
||||
|
||||
protected TextMessage createTextMessage(Session session, String initText) throws Exception {
|
||||
TextMessage msg = session.createTextMessage();
|
||||
|
||||
// Pad message text
|
||||
if (initText.length() < messageSize) {
|
||||
char[] data = new char[messageSize - initText.length()];
|
||||
Arrays.fill(data, '*');
|
||||
String str = new String(data);
|
||||
msg.setText(initText + str);
|
||||
|
||||
// Do not pad message text
|
||||
} else {
|
||||
msg.setText(initText);
|
||||
}
|
||||
|
||||
return msg;
|
||||
}
|
||||
|
||||
protected void startConsumers(Destination dest) throws Exception {
|
||||
startConsumers(createConnectionFactory(), dest);
|
||||
}
|
||||
|
||||
protected void startConsumers(ConnectionFactory factory, Destination dest) throws Exception {
|
||||
MessageConsumer consumer;
|
||||
for (int i=0; i<consumerCount; i++) {
|
||||
if (durable && topic) {
|
||||
consumer = createDurableSubscriber(factory.createConnection(), dest, "consumer" + (i+1));
|
||||
} else {
|
||||
consumer = createMessageConsumer(factory.createConnection(), dest);
|
||||
}
|
||||
// Add consumer object and message list
|
||||
consumers.put(consumer, new ArrayList());
|
||||
}
|
||||
}
|
||||
|
||||
protected MessageConsumer createMessageConsumer(Connection conn, Destination dest) throws Exception {
|
||||
connections.add(conn);
|
||||
|
||||
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
final MessageConsumer consumer = sess.createConsumer(dest);
|
||||
consumer.setMessageListener(new MessageListener() {
|
||||
public void onMessage(Message message) {
|
||||
List messageList = (List)consumers.get(consumer);
|
||||
messageList.add(message);
|
||||
}
|
||||
});
|
||||
conn.start();
|
||||
|
||||
return consumer;
|
||||
}
|
||||
|
||||
protected TopicSubscriber createDurableSubscriber(Connection conn, Destination dest, String name) throws Exception {
|
||||
conn.setClientID(name);
|
||||
connections.add(conn);
|
||||
|
||||
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
final TopicSubscriber consumer = sess.createDurableSubscriber((javax.jms.Topic)dest, name);
|
||||
consumer.setMessageListener(new MessageListener() {
|
||||
public void onMessage(Message message) {
|
||||
List messageList = (List)consumers.get(consumer);
|
||||
messageList.add(message);
|
||||
}
|
||||
});
|
||||
conn.start();
|
||||
|
||||
return consumer;
|
||||
}
|
||||
|
||||
protected void waitForAllMessagesToBeReceived(int timeout) throws Exception {
|
||||
Thread.sleep(timeout);
|
||||
}
|
||||
|
||||
protected ActiveMQDestination createDestination() throws JMSException {
|
||||
if (topic) {
|
||||
destination = new ActiveMQTopic("Topic");
|
||||
return (ActiveMQDestination)destination;
|
||||
} else {
|
||||
destination = new ActiveMQQueue("Queue");
|
||||
return (ActiveMQDestination)destination;
|
||||
}
|
||||
}
|
||||
|
||||
protected ConnectionFactory createConnectionFactory() throws Exception {
|
||||
return new ActiveMQConnectionFactory("vm://localhost");
|
||||
}
|
||||
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
return BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false"));
|
||||
}
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
super.setAutoFail(true);
|
||||
super.setUp();
|
||||
broker = createBroker();
|
||||
broker.start();
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
for (Iterator iter = connections.iterator(); iter.hasNext();) {
|
||||
Connection conn= (Connection) iter.next();
|
||||
try {
|
||||
conn.close();
|
||||
} catch (Throwable e) {
|
||||
}
|
||||
}
|
||||
broker.stop();
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
/*
|
||||
* Some helpful assertions for multiple consumers.
|
||||
*/
|
||||
protected void assertConsumerReceivedAtLeastXMessages(MessageConsumer consumer, int msgCount) {
|
||||
List messageList = (List)consumers.get(consumer);
|
||||
assertTrue("Consumer received less than " + msgCount + " messages. Actual messages received is " + messageList.size(), (messageList.size() >= msgCount));
|
||||
}
|
||||
|
||||
protected void assertConsumerReceivedAtMostXMessages(MessageConsumer consumer, int msgCount) {
|
||||
List messageList = (List)consumers.get(consumer);
|
||||
assertTrue("Consumer received more than " + msgCount + " messages. Actual messages received is " + messageList.size(), (messageList.size() <= msgCount));
|
||||
}
|
||||
|
||||
protected void assertConsumerReceivedXMessages(MessageConsumer consumer, int msgCount) {
|
||||
List messageList = (List)consumers.get(consumer);
|
||||
assertTrue("Consumer should have received exactly " + msgCount + " messages. Actual messages received is " + messageList.size(), (messageList.size() == msgCount));
|
||||
}
|
||||
|
||||
protected void assertEachConsumerReceivedAtLeastXMessages(int msgCount) {
|
||||
for (Iterator i=consumers.keySet().iterator();i.hasNext();) {
|
||||
assertConsumerReceivedAtLeastXMessages((MessageConsumer)i.next(), msgCount);
|
||||
}
|
||||
}
|
||||
|
||||
protected void assertEachConsumerReceivedAtMostXMessages(int msgCount) {
|
||||
for (Iterator i=consumers.keySet().iterator();i.hasNext();) {
|
||||
assertConsumerReceivedAtMostXMessages((MessageConsumer)i.next(), msgCount);
|
||||
}
|
||||
}
|
||||
|
||||
protected void assertEachConsumerReceivedXMessages(int msgCount) {
|
||||
for (Iterator i=consumers.keySet().iterator();i.hasNext();) {
|
||||
assertConsumerReceivedXMessages((MessageConsumer)i.next(), msgCount);
|
||||
}
|
||||
}
|
||||
|
||||
protected void assertTotalMessagesReceived(int msgCount) {
|
||||
int totalMsg = 0;
|
||||
for (Iterator i=consumers.keySet().iterator(); i.hasNext();) {
|
||||
totalMsg += ((List)consumers.get(i.next())).size();
|
||||
}
|
||||
|
||||
assertTrue("Total messages received should have been " + msgCount + ". Actual messages received is " + totalMsg, (totalMsg == msgCount));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,147 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005-2006 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker;
|
||||
|
||||
import org.apache.activemq.JmsMultipleClientsTestSupport;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
|
||||
public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
|
||||
protected int messageCount = 1000; // 1000 Messages per producer
|
||||
protected int prefetchCount = 10;
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
super.setUp();
|
||||
durable = false;
|
||||
topic = false;
|
||||
}
|
||||
|
||||
public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
|
||||
consumerCount = 2;
|
||||
producerCount = 1;
|
||||
messageCount = 1000;
|
||||
prefetchCount = 1;
|
||||
messageSize = 1024; // 1 Kb
|
||||
|
||||
doMultipleClientsTest();
|
||||
|
||||
assertTotalMessagesReceived(messageCount * producerCount);
|
||||
}
|
||||
|
||||
public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception {
|
||||
consumerCount = 2;
|
||||
producerCount = 1;
|
||||
messageCount = 1000;
|
||||
prefetchCount = messageCount * 2;
|
||||
messageSize = 1024; // 1 Kb
|
||||
|
||||
doMultipleClientsTest();
|
||||
|
||||
assertTotalMessagesReceived(messageCount * producerCount);
|
||||
}
|
||||
|
||||
public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
|
||||
consumerCount = 2;
|
||||
producerCount = 1;
|
||||
messageCount = 10;
|
||||
prefetchCount = 1;
|
||||
messageSize = 1024 * 1024 * 1; // 2 MB
|
||||
|
||||
doMultipleClientsTest();
|
||||
|
||||
assertTotalMessagesReceived(messageCount * producerCount);
|
||||
}
|
||||
|
||||
public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception {
|
||||
consumerCount = 2;
|
||||
producerCount = 1;
|
||||
messageCount = 10;
|
||||
prefetchCount = messageCount * 2;
|
||||
messageSize = 1024 * 1024 * 1; // 2 MB
|
||||
|
||||
doMultipleClientsTest();
|
||||
|
||||
assertTotalMessagesReceived(messageCount * producerCount);
|
||||
}
|
||||
|
||||
public void testOneProducerManyConsumersFewMessages() throws Exception {
|
||||
consumerCount = 50;
|
||||
producerCount = 1;
|
||||
messageCount = 10;
|
||||
messageSize = 1; // 1 byte
|
||||
prefetchCount = 10;
|
||||
|
||||
doMultipleClientsTest();
|
||||
|
||||
assertTotalMessagesReceived(messageCount * producerCount);
|
||||
}
|
||||
|
||||
public void testOneProducerManyConsumersManyMessages() throws Exception {
|
||||
consumerCount = 50;
|
||||
producerCount = 1;
|
||||
messageCount = 1000;
|
||||
messageSize = 1; // 1 byte
|
||||
prefetchCount = 10;
|
||||
|
||||
doMultipleClientsTest();
|
||||
|
||||
assertTotalMessagesReceived(messageCount * producerCount);
|
||||
}
|
||||
|
||||
public void testManyProducersOneConsumer() throws Exception {
|
||||
consumerCount = 1;
|
||||
producerCount = 50;
|
||||
messageCount = 100;
|
||||
messageSize = 1; // 1 byte
|
||||
prefetchCount = 10;
|
||||
|
||||
doMultipleClientsTest();
|
||||
|
||||
assertTotalMessagesReceived(messageCount * producerCount);
|
||||
}
|
||||
|
||||
public void testManyProducersManyConsumers() throws Exception {
|
||||
consumerCount = 50;
|
||||
producerCount = 50;
|
||||
messageCount = 100;
|
||||
messageSize = 1; // 1 byte
|
||||
prefetchCount = 10;
|
||||
|
||||
doMultipleClientsTest();
|
||||
|
||||
assertTotalMessagesReceived(messageCount * producerCount);
|
||||
}
|
||||
|
||||
public void doMultipleClientsTest() throws Exception {
|
||||
// Create destination
|
||||
final ActiveMQDestination dest = createDestination();
|
||||
|
||||
// Create consumers
|
||||
ActiveMQConnectionFactory consumerFactory = (ActiveMQConnectionFactory)createConnectionFactory();
|
||||
consumerFactory.getPrefetchPolicy().setAll(prefetchCount);
|
||||
|
||||
startConsumers(consumerFactory, dest);
|
||||
|
||||
// Wait for consumers to setup
|
||||
Thread.sleep(500);
|
||||
|
||||
startProducers(dest, messageCount);
|
||||
|
||||
// Wait for messages to be received. Make it proportional to the messages delivered.
|
||||
waitForAllMessagesToBeReceived((producerCount * messageCount) / 2000);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,123 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005-2006 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker;
|
||||
|
||||
public class TopicSubscriptionTest extends QueueSubscriptionTest {
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
super.setUp();
|
||||
durable = true;
|
||||
topic = true;
|
||||
}
|
||||
|
||||
public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
|
||||
consumerCount = 2;
|
||||
producerCount = 1;
|
||||
prefetchCount = 1;
|
||||
messageSize = 1024;
|
||||
messageCount = 1000;
|
||||
|
||||
doMultipleClientsTest();
|
||||
|
||||
assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
|
||||
}
|
||||
|
||||
public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception {
|
||||
consumerCount = 2;
|
||||
producerCount = 1;
|
||||
messageCount = 1000;
|
||||
messageSize = 1024;
|
||||
prefetchCount = messageCount * 2;
|
||||
|
||||
doMultipleClientsTest();
|
||||
|
||||
assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
|
||||
}
|
||||
|
||||
public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
|
||||
consumerCount = 2;
|
||||
producerCount = 1;
|
||||
messageCount = 10;
|
||||
messageSize = 1024 * 1024 * 1; // 1 MB
|
||||
prefetchCount = 1;
|
||||
|
||||
doMultipleClientsTest();
|
||||
|
||||
assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
|
||||
}
|
||||
|
||||
public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception {
|
||||
consumerCount = 2;
|
||||
producerCount = 1;
|
||||
messageCount = 10;
|
||||
messageSize = 1024 * 1024 * 1; // 1 MB
|
||||
prefetchCount = messageCount * 2;
|
||||
|
||||
doMultipleClientsTest();
|
||||
|
||||
assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
|
||||
}
|
||||
|
||||
public void testOneProducerManyConsumersFewMessages() throws Exception {
|
||||
consumerCount = 50;
|
||||
producerCount = 1;
|
||||
messageCount = 10;
|
||||
messageSize = 1; // 1 byte
|
||||
prefetchCount = 10;
|
||||
|
||||
doMultipleClientsTest();
|
||||
|
||||
assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
|
||||
}
|
||||
|
||||
public void testOneProducerManyConsumersManyMessages() throws Exception {
|
||||
consumerCount = 50;
|
||||
producerCount = 1;
|
||||
messageCount = 100;
|
||||
messageSize = 1; // 1 byte
|
||||
prefetchCount = 10;
|
||||
|
||||
doMultipleClientsTest();
|
||||
|
||||
assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
|
||||
}
|
||||
|
||||
|
||||
public void testManyProducersOneConsumer() throws Exception {
|
||||
consumerCount = 1;
|
||||
producerCount = 20;
|
||||
messageCount = 100;
|
||||
messageSize = 1; // 1 byte
|
||||
prefetchCount = 10;
|
||||
|
||||
doMultipleClientsTest();
|
||||
|
||||
assertTotalMessagesReceived(messageCount * producerCount * consumerCount);
|
||||
}
|
||||
|
||||
public void testManyProducersManyConsumers() throws Exception {
|
||||
consumerCount = 20;
|
||||
producerCount = 20;
|
||||
messageCount = 20;
|
||||
messageSize = 1; // 1 byte
|
||||
prefetchCount = 10;
|
||||
|
||||
doMultipleClientsTest();
|
||||
|
||||
assertTotalMessagesReceived(messageCount * producerCount * consumerCount);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,88 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005-2006 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker.policy;
|
||||
|
||||
import org.apache.activemq.broker.QueueSubscriptionTest;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
|
||||
public class RoundRobinDispatchPolicyTest extends QueueSubscriptionTest {
|
||||
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService broker = super.createBroker();
|
||||
|
||||
PolicyEntry policy = new PolicyEntry();
|
||||
policy.setDispatchPolicy(new RoundRobinDispatchPolicy());
|
||||
|
||||
PolicyMap pMap = new PolicyMap();
|
||||
pMap.setDefaultEntry(policy);
|
||||
|
||||
broker.setDestinationPolicy(pMap);
|
||||
|
||||
return broker;
|
||||
}
|
||||
|
||||
public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
|
||||
super.testOneProducerTwoConsumersSmallMessagesOnePrefetch();
|
||||
|
||||
// Ensure that each consumer should have received at least one message
|
||||
// We cannot guarantee that messages will be equally divided, since prefetch is one
|
||||
assertEachConsumerReceivedAtLeastXMessages(1);
|
||||
}
|
||||
|
||||
public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception {
|
||||
super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
|
||||
assertMessagesDividedAmongConsumers();
|
||||
}
|
||||
|
||||
public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
|
||||
super.testOneProducerTwoConsumersLargeMessagesOnePrefetch();
|
||||
|
||||
// Ensure that each consumer should have received at least one message
|
||||
// We cannot guarantee that messages will be equally divided, since prefetch is one
|
||||
assertEachConsumerReceivedAtLeastXMessages(1);
|
||||
}
|
||||
|
||||
public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception {
|
||||
super.testOneProducerTwoConsumersLargeMessagesLargePrefetch();
|
||||
assertMessagesDividedAmongConsumers();
|
||||
}
|
||||
|
||||
public void testOneProducerManyConsumersFewMessages() throws Exception {
|
||||
super.testOneProducerManyConsumersFewMessages();
|
||||
|
||||
// Since there are more consumers, each consumer should have received at most one message only
|
||||
assertMessagesDividedAmongConsumers();
|
||||
}
|
||||
|
||||
public void testOneProducerManyConsumersManyMessages() throws Exception {
|
||||
super.testOneProducerManyConsumersManyMessages();
|
||||
assertMessagesDividedAmongConsumers();
|
||||
}
|
||||
|
||||
public void testManyProducersManyConsumers() throws Exception {
|
||||
super.testManyProducersManyConsumers();
|
||||
assertMessagesDividedAmongConsumers();
|
||||
}
|
||||
|
||||
public void assertMessagesDividedAmongConsumers() {
|
||||
assertEachConsumerReceivedAtLeastXMessages((messageCount * producerCount) / consumerCount);
|
||||
assertEachConsumerReceivedAtMostXMessages(((messageCount * producerCount) / consumerCount) + 1);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005-2006 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker.policy;
|
||||
|
||||
import org.apache.activemq.broker.QueueSubscriptionTest;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
public class SimpleDispatchPolicyTest extends QueueSubscriptionTest {
|
||||
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService broker = super.createBroker();
|
||||
|
||||
PolicyEntry policy = new PolicyEntry();
|
||||
policy.setDispatchPolicy(new SimpleDispatchPolicy());
|
||||
|
||||
PolicyMap pMap = new PolicyMap();
|
||||
pMap.setDefaultEntry(policy);
|
||||
|
||||
broker.setDestinationPolicy(pMap);
|
||||
|
||||
return broker;
|
||||
}
|
||||
|
||||
public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception {
|
||||
super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
|
||||
|
||||
// One consumer should have received all messages, and the rest none
|
||||
assertOneConsumerReceivedAllMessages(messageCount);
|
||||
}
|
||||
|
||||
public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception {
|
||||
super.testOneProducerTwoConsumersLargeMessagesLargePrefetch();
|
||||
|
||||
// One consumer should have received all messages, and the rest none
|
||||
assertOneConsumerReceivedAllMessages(messageCount);
|
||||
}
|
||||
|
||||
public void assertOneConsumerReceivedAllMessages(int messageCount) throws Exception {
|
||||
boolean found = false;
|
||||
for (Iterator i=consumers.keySet().iterator(); i.hasNext();) {
|
||||
List messageList = (List)consumers.get(i.next());
|
||||
if (messageList.size() > 0) {
|
||||
if (found) {
|
||||
fail("No other consumers should have received any messages");
|
||||
} else {
|
||||
assertTrue("Consumer should have received all " + messageCount + " messages. Actual messages received is " + messageList.size(), messageList.size()==messageCount);
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!found) {
|
||||
fail("At least one consumer should have received all messages");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,107 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005-2006 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker.policy;
|
||||
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.TopicSubscriptionTest;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.broker.region.policy.StrictOrderDispatchPolicy;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Iterator;
|
||||
|
||||
public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
|
||||
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService broker = super.createBroker();
|
||||
|
||||
PolicyEntry policy = new PolicyEntry();
|
||||
policy.setDispatchPolicy(new StrictOrderDispatchPolicy());
|
||||
|
||||
PolicyMap pMap = new PolicyMap();
|
||||
pMap.setDefaultEntry(policy);
|
||||
|
||||
broker.setDestinationPolicy(pMap);
|
||||
|
||||
return broker;
|
||||
}
|
||||
|
||||
public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
|
||||
super.testOneProducerTwoConsumersSmallMessagesOnePrefetch();
|
||||
|
||||
assertReceivedMessagesAreOrdered();
|
||||
}
|
||||
|
||||
public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception {
|
||||
super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
|
||||
|
||||
assertReceivedMessagesAreOrdered();
|
||||
}
|
||||
|
||||
public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
|
||||
super.testOneProducerTwoConsumersLargeMessagesOnePrefetch();
|
||||
|
||||
assertReceivedMessagesAreOrdered();
|
||||
}
|
||||
|
||||
public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception {
|
||||
super.testOneProducerTwoConsumersLargeMessagesLargePrefetch();
|
||||
|
||||
assertReceivedMessagesAreOrdered();
|
||||
}
|
||||
|
||||
public void testOneProducerManyConsumersFewMessages() throws Exception {
|
||||
super.testOneProducerManyConsumersFewMessages();
|
||||
|
||||
assertReceivedMessagesAreOrdered();
|
||||
}
|
||||
|
||||
public void testOneProducerManyConsumersManyMessages() throws Exception {
|
||||
super.testOneProducerManyConsumersManyMessages();
|
||||
|
||||
assertReceivedMessagesAreOrdered();
|
||||
}
|
||||
|
||||
public void testManyProducersOneConsumer() throws Exception {
|
||||
super.testManyProducersOneConsumer();
|
||||
|
||||
assertReceivedMessagesAreOrdered();
|
||||
}
|
||||
|
||||
public void testManyProducersManyConsumers() throws Exception {
|
||||
super.testManyProducersManyConsumers();
|
||||
|
||||
assertReceivedMessagesAreOrdered();
|
||||
}
|
||||
|
||||
public void assertReceivedMessagesAreOrdered() throws Exception {
|
||||
// If there is only one consumer, messages is definitely ordered
|
||||
if (consumers.size() <= 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Get basis of order
|
||||
Iterator i = consumers.keySet().iterator();
|
||||
List messageOrder = (List)consumers.get(i.next());
|
||||
|
||||
for (;i.hasNext();) {
|
||||
List messageList = (List)consumers.get(i.next());
|
||||
assertTrue("Messages are not ordered.", messageOrder.equals(messageList));
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue