diff --git a/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java index 21cd3d7113..808a6a78e5 100644 --- a/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java @@ -56,6 +56,7 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport { protected int messageSize = 1; + protected boolean persistentDelivery = true; protected boolean verbose = false; protected void bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception { @@ -140,6 +141,14 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport { return broker; } + protected ConnectionFactory getConnectionFactory(String brokerName) throws Exception { + BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName); + if (brokerItem != null) { + return brokerItem.factory; + } + return null; + } + protected Connection createConnection(String brokerName) throws Exception { BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName); if (brokerItem != null) { @@ -188,6 +197,7 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport { Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = brokerItem.createProducer(destination, sess); + producer.setDeliveryMode(persistentDelivery ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); for (int i = 0; i < count; i++) { TextMessage msg = createTextMessage(sess, conn.getClientID() + ": Message-" + i); @@ -350,7 +360,6 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport { try { c.close(); } catch (ConnectionClosedException e) { - e.printStackTrace(); } } diff --git a/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java b/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java index 89f2097c3a..47e8c89c87 100644 --- a/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java +++ b/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java @@ -27,6 +27,7 @@ import java.net.URI; * @version $Revision: 1.1.1.1 $ */ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport { + protected static final int MESSAGE_COUNT = 100; /** * BrokerA -> BrokerB -> BrokerC @@ -45,7 +46,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport { MessageConsumer clientC = createConsumer("BrokerC", dest); // Send messages - sendMessages("BrokerA", dest, 10); + sendMessages("BrokerA", dest, MESSAGE_COUNT); // Let's try to wait for any messages. Should be none. Thread.sleep(1000); @@ -73,7 +74,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport { MessageConsumer clientC = createConsumer("BrokerC", dest); // Send messages - sendMessages("BrokerB", dest, 10); + sendMessages("BrokerB", dest, MESSAGE_COUNT); // Let's try to wait for any messages. Thread.sleep(1000); @@ -82,8 +83,8 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport { MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); MessageIdList msgsC = getConsumerMessages("BrokerC", clientC); - // Total received should be 10 - assertEquals(10, msgsA.getMessageCount() + msgsC.getMessageCount()); + // Total received should be 100 + assertEquals(MESSAGE_COUNT, msgsA.getMessageCount() + msgsC.getMessageCount()); } /** @@ -103,15 +104,15 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport { MessageConsumer clientB = createConsumer("BrokerB", dest); // Send messages - sendMessages("BrokerA", dest, 10); - sendMessages("BrokerC", dest, 10); + sendMessages("BrokerA", dest, MESSAGE_COUNT); + sendMessages("BrokerC", dest, MESSAGE_COUNT); // Get message count MessageIdList msgsB = getConsumerMessages("BrokerB", clientB); - msgsB.waitForMessagesToArrive(20); + msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 2); - assertEquals(20, msgsB.getMessageCount()); + assertEquals(MESSAGE_COUNT * 2, msgsB.getMessageCount()); } /** @@ -137,9 +138,9 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport { MessageConsumer clientC = createConsumer("BrokerC", dest); // Send messages - sendMessages("BrokerA", dest, 10); - sendMessages("BrokerB", dest, 10); - sendMessages("BrokerC", dest, 10); + sendMessages("BrokerA", dest, MESSAGE_COUNT); + sendMessages("BrokerB", dest, MESSAGE_COUNT); + sendMessages("BrokerC", dest, MESSAGE_COUNT); // Let's try to wait for any messages. Thread.sleep(1000); @@ -149,7 +150,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport { MessageIdList msgsB = getConsumerMessages("BrokerB", clientB); MessageIdList msgsC = getConsumerMessages("BrokerC", clientC); - assertEquals(30, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount()); + assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount()); } /** @@ -170,9 +171,9 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport { MessageConsumer clientC = createConsumer("BrokerC", dest); // Send messages - sendMessages("BrokerA", dest, 10); - sendMessages("BrokerB", dest, 10); - sendMessages("BrokerC", dest, 10); + sendMessages("BrokerA", dest, MESSAGE_COUNT); + sendMessages("BrokerB", dest, MESSAGE_COUNT); + sendMessages("BrokerC", dest, MESSAGE_COUNT); // Let's try to wait for any messages. Thread.sleep(1000); @@ -182,7 +183,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport { MessageIdList msgsB = getConsumerMessages("BrokerB", clientB); MessageIdList msgsC = getConsumerMessages("BrokerC", clientC); - assertEquals(30, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount()); + assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount()); } public void setUp() throws Exception { diff --git a/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java b/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java index e658887978..322f1cf342 100644 --- a/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java +++ b/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java @@ -27,6 +27,7 @@ import java.net.URI; * @version $Revision: 1.1.1.1 $ */ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport { + protected static final int MESSAGE_COUNT = 100; /** * BrokerA -> BrokerB -> BrokerC @@ -47,22 +48,22 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport { MessageConsumer clientC = createConsumer("BrokerC", dest); // Send messages - sendMessages("BrokerA", dest, 10); - sendMessages("BrokerB", dest, 10); - sendMessages("BrokerC", dest, 10); + sendMessages("BrokerA", dest, MESSAGE_COUNT); + sendMessages("BrokerB", dest, MESSAGE_COUNT); + sendMessages("BrokerC", dest, MESSAGE_COUNT); // Get message count MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); MessageIdList msgsB = getConsumerMessages("BrokerB", clientB); MessageIdList msgsC = getConsumerMessages("BrokerC", clientC); - msgsA.waitForMessagesToArrive(10); - msgsB.waitForMessagesToArrive(20); - msgsC.waitForMessagesToArrive(20); + msgsA.waitForMessagesToArrive(MESSAGE_COUNT); + msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 2); + msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 2); - assertEquals(10, msgsA.getMessageCount()); - assertEquals(20, msgsB.getMessageCount()); - assertEquals(20, msgsC.getMessageCount()); + assertEquals(MESSAGE_COUNT, msgsA.getMessageCount()); + assertEquals(MESSAGE_COUNT * 2, msgsB.getMessageCount()); + assertEquals(MESSAGE_COUNT * 2, msgsC.getMessageCount()); } /** @@ -84,22 +85,22 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport { MessageConsumer clientC = createConsumer("BrokerC", dest); // Send messages - sendMessages("BrokerA", dest, 10); - sendMessages("BrokerB", dest, 10); - sendMessages("BrokerC", dest, 10); + sendMessages("BrokerA", dest, MESSAGE_COUNT); + sendMessages("BrokerB", dest, MESSAGE_COUNT); + sendMessages("BrokerC", dest, MESSAGE_COUNT); // Get message count MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); MessageIdList msgsB = getConsumerMessages("BrokerB", clientB); MessageIdList msgsC = getConsumerMessages("BrokerC", clientC); - msgsA.waitForMessagesToArrive(20); - msgsB.waitForMessagesToArrive(10); - msgsC.waitForMessagesToArrive(20); + msgsA.waitForMessagesToArrive(MESSAGE_COUNT * 2); + msgsB.waitForMessagesToArrive(MESSAGE_COUNT); + msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 2); - assertEquals(20, msgsA.getMessageCount()); - assertEquals(10, msgsB.getMessageCount()); - assertEquals(20, msgsC.getMessageCount()); + assertEquals(MESSAGE_COUNT * 2, msgsA.getMessageCount()); + assertEquals(MESSAGE_COUNT, msgsB.getMessageCount()); + assertEquals(MESSAGE_COUNT * 2, msgsC.getMessageCount()); } /** @@ -121,22 +122,22 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport { MessageConsumer clientC = createConsumer("BrokerC", dest); // Send messages - sendMessages("BrokerA", dest, 10); - sendMessages("BrokerB", dest, 10); - sendMessages("BrokerC", dest, 10); + sendMessages("BrokerA", dest, MESSAGE_COUNT); + sendMessages("BrokerB", dest, MESSAGE_COUNT); + sendMessages("BrokerC", dest, MESSAGE_COUNT); // Get message count MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); MessageIdList msgsB = getConsumerMessages("BrokerB", clientB); MessageIdList msgsC = getConsumerMessages("BrokerC", clientC); - msgsA.waitForMessagesToArrive(10); - msgsB.waitForMessagesToArrive(30); - msgsC.waitForMessagesToArrive(10); + msgsA.waitForMessagesToArrive(MESSAGE_COUNT); + msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 3); + msgsC.waitForMessagesToArrive(MESSAGE_COUNT); - assertEquals(10, msgsA.getMessageCount()); - assertEquals(30, msgsB.getMessageCount()); - assertEquals(10, msgsC.getMessageCount()); + assertEquals(MESSAGE_COUNT, msgsA.getMessageCount()); + assertEquals(MESSAGE_COUNT * 3, msgsB.getMessageCount()); + assertEquals(MESSAGE_COUNT, msgsC.getMessageCount()); } /** @@ -162,22 +163,22 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport { MessageConsumer clientC = createConsumer("BrokerC", dest); // Send messages - sendMessages("BrokerA", dest, 10); - sendMessages("BrokerB", dest, 10); - sendMessages("BrokerC", dest, 10); + sendMessages("BrokerA", dest, MESSAGE_COUNT); + sendMessages("BrokerB", dest, MESSAGE_COUNT); + sendMessages("BrokerC", dest, MESSAGE_COUNT); // Get message count MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); MessageIdList msgsB = getConsumerMessages("BrokerB", clientB); MessageIdList msgsC = getConsumerMessages("BrokerC", clientC); - msgsA.waitForMessagesToArrive(30); - msgsB.waitForMessagesToArrive(30); - msgsC.waitForMessagesToArrive(30); + msgsA.waitForMessagesToArrive(MESSAGE_COUNT * 3); + msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 3); + msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 3); - assertEquals(30, msgsA.getMessageCount()); - assertEquals(30, msgsB.getMessageCount()); - assertEquals(30, msgsC.getMessageCount()); + assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount()); + assertEquals(MESSAGE_COUNT * 3, msgsB.getMessageCount()); + assertEquals(MESSAGE_COUNT * 3, msgsC.getMessageCount()); } /** @@ -198,22 +199,22 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport { MessageConsumer clientC = createConsumer("BrokerC", dest); // Send messages - sendMessages("BrokerA", dest, 10); - sendMessages("BrokerB", dest, 10); - sendMessages("BrokerC", dest, 10); + sendMessages("BrokerA", dest, MESSAGE_COUNT); + sendMessages("BrokerB", dest, MESSAGE_COUNT); + sendMessages("BrokerC", dest, MESSAGE_COUNT); // Get message count MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); MessageIdList msgsB = getConsumerMessages("BrokerB", clientB); MessageIdList msgsC = getConsumerMessages("BrokerC", clientC); - msgsA.waitForMessagesToArrive(30); - msgsB.waitForMessagesToArrive(30); - msgsC.waitForMessagesToArrive(30); + msgsA.waitForMessagesToArrive(MESSAGE_COUNT * 3); + msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 3); + msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 3); - assertEquals(30, msgsA.getMessageCount()); - assertEquals(30, msgsB.getMessageCount()); - assertEquals(30, msgsC.getMessageCount()); + assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount()); + assertEquals(MESSAGE_COUNT * 3, msgsB.getMessageCount()); + assertEquals(MESSAGE_COUNT * 3, msgsC.getMessageCount()); } public void setUp() throws Exception { diff --git a/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java b/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java index 79fc64fbaf..60d4d61fb5 100644 --- a/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java +++ b/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java @@ -36,6 +36,8 @@ import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger; * @version $Revision: 1.1.1.1 $ */ public class TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest extends JmsMultipleBrokersTestSupport { + protected static final int MESSAGE_COUNT = 10; + protected List bridges; protected AtomicInteger msgDispatchCount; @@ -56,20 +58,20 @@ public class TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest extends JmsMultip MessageConsumer clientB = createConsumer("BrokerB", dest); // Send messages - sendMessages("BrokerA", dest, 10); + sendMessages("BrokerA", dest, MESSAGE_COUNT); // Get message count MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); MessageIdList msgsB = getConsumerMessages("BrokerB", clientB); - msgsA.waitForMessagesToArrive(10); - msgsB.waitForMessagesToArrive(10); + msgsA.waitForMessagesToArrive(MESSAGE_COUNT); + msgsB.waitForMessagesToArrive(MESSAGE_COUNT); - assertEquals(10, msgsA.getMessageCount()); - assertEquals(10, msgsB.getMessageCount()); + assertEquals(MESSAGE_COUNT, msgsA.getMessageCount()); + assertEquals(MESSAGE_COUNT, msgsB.getMessageCount()); // Check that 10 message dispatch commands are send over the network - assertEquals(10, msgDispatchCount.get()); + assertEquals(MESSAGE_COUNT, msgDispatchCount.get()); } /** @@ -88,14 +90,14 @@ public class TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest extends JmsMultip MessageConsumer clientA = createConsumer("BrokerA", dest); // Send messages - sendMessages("BrokerA", dest, 10); + sendMessages("BrokerA", dest, MESSAGE_COUNT); // Get message count MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); - msgsA.waitForMessagesToArrive(10); + msgsA.waitForMessagesToArrive(MESSAGE_COUNT); - assertEquals(10, msgsA.getMessageCount()); + assertEquals(MESSAGE_COUNT, msgsA.getMessageCount()); // Check that no message dispatch commands are send over the network assertEquals(0, msgDispatchCount.get()); diff --git a/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java b/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java new file mode 100644 index 0000000000..a631e76f6d --- /dev/null +++ b/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java @@ -0,0 +1,365 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import org.apache.activemq.JmsMultipleBrokersTestSupport; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; + +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.Connection; +import javax.jms.Session; +import javax.jms.MessageConsumer; +import java.net.URI; + +/** + * @version $Revision: 1.1.1.1 $ + */ +public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSupport { + protected static final int MESSAGE_COUNT = 100; // Best if a factor of 100 + protected static final int PREFETCH_COUNT = 1; + + protected int msgsClient1, msgsClient2; + protected String broker1, broker2; + + public void testClientAReceivesOnly() throws Exception { + broker1 = "BrokerA"; + broker2 = "BrokerB"; + + doOneClientReceivesOnly(); + } + + public void testClientBReceivesOnly() throws Exception { + broker1 = "BrokerB"; + broker2 = "BrokerA"; + + doOneClientReceivesOnly(); + } + + public void doOneClientReceivesOnly() throws Exception { + // Bridge brokers + bridgeBrokers(broker1, broker2); + bridgeBrokers(broker2, broker1); + + // Run brokers + startAllBrokers(); + + // Create queue + Destination dest = createDestination("TEST.FOO", false); + + // Create consumers + MessageConsumer client1 = createConsumer(broker1, dest); + MessageConsumer client2 = createConsumer(broker2, dest); + + // Give clients time to register with broker + Thread.sleep(500); + + // Always send messages to broker A + sendMessages("BrokerA", dest, MESSAGE_COUNT); + + // Close the second client, messages should be sent to the first client + client2.close(); + + // Let the first client receive all messages + msgsClient1 += receiveAllMessages(client1); + client1.close(); + + // First client should have received 100 messages + assertEquals("Client for " + broker1 + " should have receive all messages.", MESSAGE_COUNT, msgsClient1); + } + + public void testClientAReceivesOnlyAfterReconnect() throws Exception { + broker1 = "BrokerA"; + broker2 = "BrokerB"; + + doOneClientReceivesOnlyAfterReconnect(); + } + + public void testClientBReceivesOnlyAfterReconnect() throws Exception { + broker1 = "BrokerB"; + broker2 = "BrokerA"; + + doOneClientReceivesOnlyAfterReconnect(); + } + + public void doOneClientReceivesOnlyAfterReconnect() throws Exception { + // Bridge brokers + bridgeBrokers(broker1, broker2); + bridgeBrokers(broker2, broker1); + + // Run brokers + startAllBrokers(); + + // Create queue + Destination dest = createDestination("TEST.FOO", false); + + // Create first consumer + MessageConsumer client1 = createConsumer(broker1, dest); + MessageConsumer client2 = createConsumer(broker2, dest); + + // Give clients time to register with broker + Thread.sleep(500); + + // Always send message to broker A + sendMessages("BrokerA", dest, MESSAGE_COUNT); + + // Let the first client receive the first 20% of messages + msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20)); + + // Disconnect the first client + client1.close(); + + // Create another client for the first broker + client1 = createConsumer(broker1, dest); + Thread.sleep(500); + + // Close the second client, messages should be sent to the first client + client2.close(); + + // Receive the rest of the messages + msgsClient1 += receiveAllMessages(client1); + client1.close(); + + // The first client should have received 100 messages + assertEquals("Client for " + broker1 + " should have received all messages.", MESSAGE_COUNT, msgsClient1); + } + + public void testTwoClientsReceiveClientADisconnects() throws Exception { + broker1 = "BrokerA"; + broker2 = "BrokerB"; + + doTwoClientsReceiveOneClientDisconnects(); + } + + public void testTwoClientsReceiveClientBDisconnects() throws Exception { + broker1 = "BrokerB"; + broker2 = "BrokerA"; + + doTwoClientsReceiveOneClientDisconnects(); + } + + public void doTwoClientsReceiveOneClientDisconnects() throws Exception { + // Bridge brokers + bridgeBrokers(broker1, broker2); + bridgeBrokers(broker2, broker1); + + // Run brokers + startAllBrokers(); + + // Create queue + Destination dest = createDestination("TEST.FOO", false); + + // Create first client + MessageConsumer client1 = createConsumer(broker1, dest); + MessageConsumer client2 = createConsumer(broker2, dest); + + // Give clients time to register with broker + Thread.sleep(500); + + // Always send messages to broker A + sendMessages("BrokerA", dest, MESSAGE_COUNT); + + // Let each client receive 20% of the messages - 40% total + msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20)); + msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20)); + + // Disconnect the first client + client1.close(); + + // Let the second client receive the rest of the messages + msgsClient2 += receiveAllMessages(client2); + client2.close(); + + // First client should have received 20% of the messages + assertEquals("Client for " + broker1 + " should have received 20% of the messages.", (int)(MESSAGE_COUNT * 0.20), msgsClient1); + + // Second client should have received 80% of the messages + assertEquals("Client for " + broker2 + " should have received 80% of the messages.", (int)(MESSAGE_COUNT * 0.80), msgsClient2); + } + + public void testTwoClientsReceiveClientAReconnects() throws Exception { + broker1 = "BrokerA"; + broker2 = "BrokerB"; + + doTwoClientsReceiveOneClientReconnects(); + } + + public void testTwoClientsReceiveClientBReconnects() throws Exception { + broker1 = "BrokerB"; + broker2 = "BrokerA"; + + doTwoClientsReceiveOneClientReconnects(); + } + + public void doTwoClientsReceiveOneClientReconnects() throws Exception { + // Bridge brokers + bridgeBrokers(broker1, broker2); + bridgeBrokers(broker2, broker1); + + // Run brokers + startAllBrokers(); + + // Create queue + Destination dest = createDestination("TEST.FOO", false); + + // Create the first client + MessageConsumer client1 = createConsumer(broker1, dest); + MessageConsumer client2 = createConsumer(broker2, dest); + + // Give clients time to register with broker + Thread.sleep(500); + + // Always send messages to broker A + sendMessages("BrokerA", dest, MESSAGE_COUNT); + + // Let each client receive 20% of the messages - 40% total + msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20)); + msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20)); + + // Disconnect the first client + client1.close(); + + // Let the second client receive 20% more of the total messages + msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20)); + + // Create another client for broker 1 + client1 = createConsumer(broker1, dest); + Thread.sleep(500); + + // Let each client receive 20% of the messages - 40% total + msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20)); + client1.close(); + + msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20)); + client2.close(); + + // First client should have received 40 messages + assertEquals("Client for " + broker1 + " should have received 40% of the messages.", (int)(MESSAGE_COUNT * 0.40), msgsClient1); + + // Second client should have received 60 messages + assertEquals("Client for " + broker2 + " should have received 60% of the messages.", (int)(MESSAGE_COUNT * 0.60), msgsClient2); + } + + public void testTwoClientsReceiveTwoClientReconnects() throws Exception { + broker1 = "BrokerA"; + broker2 = "BrokerB"; + + // Bridge brokers + bridgeBrokers(broker1, broker2); + bridgeBrokers(broker2, broker1); + + // Run brokers + startAllBrokers(); + + // Create queue + Destination dest = createDestination("TEST.FOO", false); + + // Create the first client + MessageConsumer client1 = createConsumer(broker1, dest); + MessageConsumer client2 = createConsumer(broker2, dest); + + // Give clients time to register with broker + Thread.sleep(500); + + // Always send messages to broker A + sendMessages("BrokerA", dest, MESSAGE_COUNT); + + // Let each client receive 20% of the messages - 40% total + msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20)); + msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20)); + + // Disconnect both clients + client1.close(); + client2.close(); + + // Create another two clients for each broker + client1 = createConsumer(broker1, dest); + client2 = createConsumer(broker2, dest); + Thread.sleep(500); + + // Let each client receive 30% more of the total messages - 60% total + msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.30)); + client1.close(); + + msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.30)); + client2.close(); + + // First client should have received 50% of the messages + assertEquals("Client for " + broker1 + " should have received 50% of the messages.", (int)(MESSAGE_COUNT * 0.50), msgsClient1); + + // Second client should have received 50% of the messages + assertEquals("Client for " + broker2 + " should have received 50% of the messages.", (int)(MESSAGE_COUNT * 0.50), msgsClient2); + } + + protected int receiveExactMessages(MessageConsumer consumer, int msgCount) throws Exception { + Message msg; + int i; + for (i=0; i