diff --git a/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java new file mode 100644 index 0000000000..34bfa40f4d --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java @@ -0,0 +1,273 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerFactory; + +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Arrays; +import java.util.Collections; +import java.net.URI; + +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.*; + +/** + * Test case support used to test multiple message comsumers and message producers connecting to a single broker. + * + * @version $Revision$ + */ +public class JmsMultipleClientsTestSupport extends CombinationTestSupport { + private AtomicInteger producerLock; + + protected Map consumers = new HashMap(); // Map of consumer with messages received + protected int consumerCount = 1; + protected int producerCount = 1; + + protected int messageSize = 1024; + + protected boolean useConcurrentSend = true; + protected boolean durable = false; + protected boolean topic = false; + + protected BrokerService broker; + protected Destination destination; + protected List connections = Collections.synchronizedList(new ArrayList()); + + protected void startProducers(Destination dest, int msgCount) throws Exception { + startProducers(createConnectionFactory(), dest, msgCount); + } + + protected void startProducers(final ConnectionFactory factory, final Destination dest, final int msgCount) throws Exception { + // Use concurrent send + if (useConcurrentSend) { + producerLock = new AtomicInteger(producerCount); + + for (int i=0; i= msgCount)); + } + + protected void assertConsumerReceivedAtMostXMessages(MessageConsumer consumer, int msgCount) { + List messageList = (List)consumers.get(consumer); + assertTrue("Consumer received more than " + msgCount + " messages. Actual messages received is " + messageList.size(), (messageList.size() <= msgCount)); + } + + protected void assertConsumerReceivedXMessages(MessageConsumer consumer, int msgCount) { + List messageList = (List)consumers.get(consumer); + assertTrue("Consumer should have received exactly " + msgCount + " messages. Actual messages received is " + messageList.size(), (messageList.size() == msgCount)); + } + + protected void assertEachConsumerReceivedAtLeastXMessages(int msgCount) { + for (Iterator i=consumers.keySet().iterator();i.hasNext();) { + assertConsumerReceivedAtLeastXMessages((MessageConsumer)i.next(), msgCount); + } + } + + protected void assertEachConsumerReceivedAtMostXMessages(int msgCount) { + for (Iterator i=consumers.keySet().iterator();i.hasNext();) { + assertConsumerReceivedAtMostXMessages((MessageConsumer)i.next(), msgCount); + } + } + + protected void assertEachConsumerReceivedXMessages(int msgCount) { + for (Iterator i=consumers.keySet().iterator();i.hasNext();) { + assertConsumerReceivedXMessages((MessageConsumer)i.next(), msgCount); + } + } + + protected void assertTotalMessagesReceived(int msgCount) { + int totalMsg = 0; + for (Iterator i=consumers.keySet().iterator(); i.hasNext();) { + totalMsg += ((List)consumers.get(i.next())).size(); + } + + assertTrue("Total messages received should have been " + msgCount + ". Actual messages received is " + totalMsg, (totalMsg == msgCount)); + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java new file mode 100644 index 0000000000..0b9ae65e6f --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java @@ -0,0 +1,147 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import org.apache.activemq.JmsMultipleClientsTestSupport; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQDestination; + +public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport { + protected int messageCount = 1000; // 1000 Messages per producer + protected int prefetchCount = 10; + + protected void setUp() throws Exception { + super.setUp(); + durable = false; + topic = false; + } + + public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception { + consumerCount = 2; + producerCount = 1; + messageCount = 1000; + prefetchCount = 1; + messageSize = 1024; // 1 Kb + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * producerCount); + } + + public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception { + consumerCount = 2; + producerCount = 1; + messageCount = 1000; + prefetchCount = messageCount * 2; + messageSize = 1024; // 1 Kb + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * producerCount); + } + + public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception { + consumerCount = 2; + producerCount = 1; + messageCount = 10; + prefetchCount = 1; + messageSize = 1024 * 1024 * 1; // 2 MB + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * producerCount); + } + + public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception { + consumerCount = 2; + producerCount = 1; + messageCount = 10; + prefetchCount = messageCount * 2; + messageSize = 1024 * 1024 * 1; // 2 MB + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * producerCount); + } + + public void testOneProducerManyConsumersFewMessages() throws Exception { + consumerCount = 50; + producerCount = 1; + messageCount = 10; + messageSize = 1; // 1 byte + prefetchCount = 10; + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * producerCount); + } + + public void testOneProducerManyConsumersManyMessages() throws Exception { + consumerCount = 50; + producerCount = 1; + messageCount = 1000; + messageSize = 1; // 1 byte + prefetchCount = 10; + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * producerCount); + } + + public void testManyProducersOneConsumer() throws Exception { + consumerCount = 1; + producerCount = 50; + messageCount = 100; + messageSize = 1; // 1 byte + prefetchCount = 10; + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * producerCount); + } + + public void testManyProducersManyConsumers() throws Exception { + consumerCount = 50; + producerCount = 50; + messageCount = 100; + messageSize = 1; // 1 byte + prefetchCount = 10; + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * producerCount); + } + + public void doMultipleClientsTest() throws Exception { + // Create destination + final ActiveMQDestination dest = createDestination(); + + // Create consumers + ActiveMQConnectionFactory consumerFactory = (ActiveMQConnectionFactory)createConnectionFactory(); + consumerFactory.getPrefetchPolicy().setAll(prefetchCount); + + startConsumers(consumerFactory, dest); + + // Wait for consumers to setup + Thread.sleep(500); + + startProducers(dest, messageCount); + + // Wait for messages to be received. Make it proportional to the messages delivered. + waitForAllMessagesToBeReceived((producerCount * messageCount) / 2000); + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java new file mode 100644 index 0000000000..924585d57c --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java @@ -0,0 +1,123 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +public class TopicSubscriptionTest extends QueueSubscriptionTest { + + protected void setUp() throws Exception { + super.setUp(); + durable = true; + topic = true; + } + + public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception { + consumerCount = 2; + producerCount = 1; + prefetchCount = 1; + messageSize = 1024; + messageCount = 1000; + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * consumerCount * producerCount); + } + + public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception { + consumerCount = 2; + producerCount = 1; + messageCount = 1000; + messageSize = 1024; + prefetchCount = messageCount * 2; + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * consumerCount * producerCount); + } + + public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception { + consumerCount = 2; + producerCount = 1; + messageCount = 10; + messageSize = 1024 * 1024 * 1; // 1 MB + prefetchCount = 1; + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * consumerCount * producerCount); + } + + public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception { + consumerCount = 2; + producerCount = 1; + messageCount = 10; + messageSize = 1024 * 1024 * 1; // 1 MB + prefetchCount = messageCount * 2; + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * consumerCount * producerCount); + } + + public void testOneProducerManyConsumersFewMessages() throws Exception { + consumerCount = 50; + producerCount = 1; + messageCount = 10; + messageSize = 1; // 1 byte + prefetchCount = 10; + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * consumerCount * producerCount); + } + + public void testOneProducerManyConsumersManyMessages() throws Exception { + consumerCount = 50; + producerCount = 1; + messageCount = 100; + messageSize = 1; // 1 byte + prefetchCount = 10; + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * consumerCount * producerCount); + } + + + public void testManyProducersOneConsumer() throws Exception { + consumerCount = 1; + producerCount = 20; + messageCount = 100; + messageSize = 1; // 1 byte + prefetchCount = 10; + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * producerCount * consumerCount); + } + + public void testManyProducersManyConsumers() throws Exception { + consumerCount = 20; + producerCount = 20; + messageCount = 20; + messageSize = 1; // 1 byte + prefetchCount = 10; + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * producerCount * consumerCount); + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java new file mode 100644 index 0000000000..f0f094301d --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java @@ -0,0 +1,88 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import org.apache.activemq.broker.QueueSubscriptionTest; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy; +import org.apache.activemq.broker.region.policy.PolicyMap; + +public class RoundRobinDispatchPolicyTest extends QueueSubscriptionTest { + + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + + PolicyEntry policy = new PolicyEntry(); + policy.setDispatchPolicy(new RoundRobinDispatchPolicy()); + + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + + broker.setDestinationPolicy(pMap); + + return broker; + } + + public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception { + super.testOneProducerTwoConsumersSmallMessagesOnePrefetch(); + + // Ensure that each consumer should have received at least one message + // We cannot guarantee that messages will be equally divided, since prefetch is one + assertEachConsumerReceivedAtLeastXMessages(1); + } + + public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception { + super.testOneProducerTwoConsumersSmallMessagesLargePrefetch(); + assertMessagesDividedAmongConsumers(); + } + + public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception { + super.testOneProducerTwoConsumersLargeMessagesOnePrefetch(); + + // Ensure that each consumer should have received at least one message + // We cannot guarantee that messages will be equally divided, since prefetch is one + assertEachConsumerReceivedAtLeastXMessages(1); + } + + public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception { + super.testOneProducerTwoConsumersLargeMessagesLargePrefetch(); + assertMessagesDividedAmongConsumers(); + } + + public void testOneProducerManyConsumersFewMessages() throws Exception { + super.testOneProducerManyConsumersFewMessages(); + + // Since there are more consumers, each consumer should have received at most one message only + assertMessagesDividedAmongConsumers(); + } + + public void testOneProducerManyConsumersManyMessages() throws Exception { + super.testOneProducerManyConsumersManyMessages(); + assertMessagesDividedAmongConsumers(); + } + + public void testManyProducersManyConsumers() throws Exception { + super.testManyProducersManyConsumers(); + assertMessagesDividedAmongConsumers(); + } + + public void assertMessagesDividedAmongConsumers() { + assertEachConsumerReceivedAtLeastXMessages((messageCount * producerCount) / consumerCount); + assertEachConsumerReceivedAtMostXMessages(((messageCount * producerCount) / consumerCount) + 1); + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java new file mode 100644 index 0000000000..bfa61fbd70 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java @@ -0,0 +1,76 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import org.apache.activemq.broker.QueueSubscriptionTest; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; + +import java.util.Iterator; +import java.util.List; + +public class SimpleDispatchPolicyTest extends QueueSubscriptionTest { + + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + + PolicyEntry policy = new PolicyEntry(); + policy.setDispatchPolicy(new SimpleDispatchPolicy()); + + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + + broker.setDestinationPolicy(pMap); + + return broker; + } + + public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception { + super.testOneProducerTwoConsumersSmallMessagesLargePrefetch(); + + // One consumer should have received all messages, and the rest none + assertOneConsumerReceivedAllMessages(messageCount); + } + + public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception { + super.testOneProducerTwoConsumersLargeMessagesLargePrefetch(); + + // One consumer should have received all messages, and the rest none + assertOneConsumerReceivedAllMessages(messageCount); + } + + public void assertOneConsumerReceivedAllMessages(int messageCount) throws Exception { + boolean found = false; + for (Iterator i=consumers.keySet().iterator(); i.hasNext();) { + List messageList = (List)consumers.get(i.next()); + if (messageList.size() > 0) { + if (found) { + fail("No other consumers should have received any messages"); + } else { + assertTrue("Consumer should have received all " + messageCount + " messages. Actual messages received is " + messageList.size(), messageList.size()==messageCount); + found = true; + } + } + } + + if (!found) { + fail("At least one consumer should have received all messages"); + } + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java new file mode 100644 index 0000000000..df244e4064 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java @@ -0,0 +1,107 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TopicSubscriptionTest; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.StrictOrderDispatchPolicy; + +import java.util.List; +import java.util.Iterator; + +public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest { + + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + + PolicyEntry policy = new PolicyEntry(); + policy.setDispatchPolicy(new StrictOrderDispatchPolicy()); + + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + + broker.setDestinationPolicy(pMap); + + return broker; + } + + public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception { + super.testOneProducerTwoConsumersSmallMessagesOnePrefetch(); + + assertReceivedMessagesAreOrdered(); + } + + public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception { + super.testOneProducerTwoConsumersSmallMessagesLargePrefetch(); + + assertReceivedMessagesAreOrdered(); + } + + public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception { + super.testOneProducerTwoConsumersLargeMessagesOnePrefetch(); + + assertReceivedMessagesAreOrdered(); + } + + public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception { + super.testOneProducerTwoConsumersLargeMessagesLargePrefetch(); + + assertReceivedMessagesAreOrdered(); + } + + public void testOneProducerManyConsumersFewMessages() throws Exception { + super.testOneProducerManyConsumersFewMessages(); + + assertReceivedMessagesAreOrdered(); + } + + public void testOneProducerManyConsumersManyMessages() throws Exception { + super.testOneProducerManyConsumersManyMessages(); + + assertReceivedMessagesAreOrdered(); + } + + public void testManyProducersOneConsumer() throws Exception { + super.testManyProducersOneConsumer(); + + assertReceivedMessagesAreOrdered(); + } + + public void testManyProducersManyConsumers() throws Exception { + super.testManyProducersManyConsumers(); + + assertReceivedMessagesAreOrdered(); + } + + public void assertReceivedMessagesAreOrdered() throws Exception { + // If there is only one consumer, messages is definitely ordered + if (consumers.size() <= 1) { + return; + } + + // Get basis of order + Iterator i = consumers.keySet().iterator(); + List messageOrder = (List)consumers.get(i.next()); + + for (;i.hasNext();) { + List messageList = (List)consumers.get(i.next()); + assertTrue("Messages are not ordered.", messageOrder.equals(messageList)); + } + } +}