- Added two broker test where each broker has a client that connects and disconnects and the destination is a queue

- Minor updates to some of the test cases and test support

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@367958 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Adrian T. Co 2006-01-11 07:43:05 +00:00
parent 0d7046918c
commit 1a9d2862a4
5 changed files with 449 additions and 71 deletions

View File

@ -56,6 +56,7 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
protected int messageSize = 1;
protected boolean persistentDelivery = true;
protected boolean verbose = false;
protected void bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception {
@ -140,6 +141,14 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
return broker;
}
protected ConnectionFactory getConnectionFactory(String brokerName) throws Exception {
BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
if (brokerItem != null) {
return brokerItem.factory;
}
return null;
}
protected Connection createConnection(String brokerName) throws Exception {
BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
if (brokerItem != null) {
@ -188,6 +197,7 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = brokerItem.createProducer(destination, sess);
producer.setDeliveryMode(persistentDelivery ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
for (int i = 0; i < count; i++) {
TextMessage msg = createTextMessage(sess, conn.getClientID() + ": Message-" + i);
@ -350,7 +360,6 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
try {
c.close();
} catch (ConnectionClosedException e) {
e.printStackTrace();
}
}

View File

@ -27,6 +27,7 @@ import java.net.URI;
* @version $Revision: 1.1.1.1 $
*/
public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
protected static final int MESSAGE_COUNT = 100;
/**
* BrokerA -> BrokerB -> BrokerC
@ -45,7 +46,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
MessageConsumer clientC = createConsumer("BrokerC", dest);
// Send messages
sendMessages("BrokerA", dest, 10);
sendMessages("BrokerA", dest, MESSAGE_COUNT);
// Let's try to wait for any messages. Should be none.
Thread.sleep(1000);
@ -73,7 +74,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
MessageConsumer clientC = createConsumer("BrokerC", dest);
// Send messages
sendMessages("BrokerB", dest, 10);
sendMessages("BrokerB", dest, MESSAGE_COUNT);
// Let's try to wait for any messages.
Thread.sleep(1000);
@ -82,8 +83,8 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
// Total received should be 10
assertEquals(10, msgsA.getMessageCount() + msgsC.getMessageCount());
// Total received should be 100
assertEquals(MESSAGE_COUNT, msgsA.getMessageCount() + msgsC.getMessageCount());
}
/**
@ -103,15 +104,15 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
MessageConsumer clientB = createConsumer("BrokerB", dest);
// Send messages
sendMessages("BrokerA", dest, 10);
sendMessages("BrokerC", dest, 10);
sendMessages("BrokerA", dest, MESSAGE_COUNT);
sendMessages("BrokerC", dest, MESSAGE_COUNT);
// Get message count
MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
msgsB.waitForMessagesToArrive(20);
msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 2);
assertEquals(20, msgsB.getMessageCount());
assertEquals(MESSAGE_COUNT * 2, msgsB.getMessageCount());
}
/**
@ -137,9 +138,9 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
MessageConsumer clientC = createConsumer("BrokerC", dest);
// Send messages
sendMessages("BrokerA", dest, 10);
sendMessages("BrokerB", dest, 10);
sendMessages("BrokerC", dest, 10);
sendMessages("BrokerA", dest, MESSAGE_COUNT);
sendMessages("BrokerB", dest, MESSAGE_COUNT);
sendMessages("BrokerC", dest, MESSAGE_COUNT);
// Let's try to wait for any messages.
Thread.sleep(1000);
@ -149,7 +150,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
assertEquals(30, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount());
assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount());
}
/**
@ -170,9 +171,9 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
MessageConsumer clientC = createConsumer("BrokerC", dest);
// Send messages
sendMessages("BrokerA", dest, 10);
sendMessages("BrokerB", dest, 10);
sendMessages("BrokerC", dest, 10);
sendMessages("BrokerA", dest, MESSAGE_COUNT);
sendMessages("BrokerB", dest, MESSAGE_COUNT);
sendMessages("BrokerC", dest, MESSAGE_COUNT);
// Let's try to wait for any messages.
Thread.sleep(1000);
@ -182,7 +183,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
assertEquals(30, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount());
assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount());
}
public void setUp() throws Exception {

View File

@ -27,6 +27,7 @@ import java.net.URI;
* @version $Revision: 1.1.1.1 $
*/
public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport {
protected static final int MESSAGE_COUNT = 100;
/**
* BrokerA -> BrokerB -> BrokerC
@ -47,22 +48,22 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport {
MessageConsumer clientC = createConsumer("BrokerC", dest);
// Send messages
sendMessages("BrokerA", dest, 10);
sendMessages("BrokerB", dest, 10);
sendMessages("BrokerC", dest, 10);
sendMessages("BrokerA", dest, MESSAGE_COUNT);
sendMessages("BrokerB", dest, MESSAGE_COUNT);
sendMessages("BrokerC", dest, MESSAGE_COUNT);
// Get message count
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
msgsA.waitForMessagesToArrive(10);
msgsB.waitForMessagesToArrive(20);
msgsC.waitForMessagesToArrive(20);
msgsA.waitForMessagesToArrive(MESSAGE_COUNT);
msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 2);
msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 2);
assertEquals(10, msgsA.getMessageCount());
assertEquals(20, msgsB.getMessageCount());
assertEquals(20, msgsC.getMessageCount());
assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
assertEquals(MESSAGE_COUNT * 2, msgsB.getMessageCount());
assertEquals(MESSAGE_COUNT * 2, msgsC.getMessageCount());
}
/**
@ -84,22 +85,22 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport {
MessageConsumer clientC = createConsumer("BrokerC", dest);
// Send messages
sendMessages("BrokerA", dest, 10);
sendMessages("BrokerB", dest, 10);
sendMessages("BrokerC", dest, 10);
sendMessages("BrokerA", dest, MESSAGE_COUNT);
sendMessages("BrokerB", dest, MESSAGE_COUNT);
sendMessages("BrokerC", dest, MESSAGE_COUNT);
// Get message count
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
msgsA.waitForMessagesToArrive(20);
msgsB.waitForMessagesToArrive(10);
msgsC.waitForMessagesToArrive(20);
msgsA.waitForMessagesToArrive(MESSAGE_COUNT * 2);
msgsB.waitForMessagesToArrive(MESSAGE_COUNT);
msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 2);
assertEquals(20, msgsA.getMessageCount());
assertEquals(10, msgsB.getMessageCount());
assertEquals(20, msgsC.getMessageCount());
assertEquals(MESSAGE_COUNT * 2, msgsA.getMessageCount());
assertEquals(MESSAGE_COUNT, msgsB.getMessageCount());
assertEquals(MESSAGE_COUNT * 2, msgsC.getMessageCount());
}
/**
@ -121,22 +122,22 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport {
MessageConsumer clientC = createConsumer("BrokerC", dest);
// Send messages
sendMessages("BrokerA", dest, 10);
sendMessages("BrokerB", dest, 10);
sendMessages("BrokerC", dest, 10);
sendMessages("BrokerA", dest, MESSAGE_COUNT);
sendMessages("BrokerB", dest, MESSAGE_COUNT);
sendMessages("BrokerC", dest, MESSAGE_COUNT);
// Get message count
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
msgsA.waitForMessagesToArrive(10);
msgsB.waitForMessagesToArrive(30);
msgsC.waitForMessagesToArrive(10);
msgsA.waitForMessagesToArrive(MESSAGE_COUNT);
msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 3);
msgsC.waitForMessagesToArrive(MESSAGE_COUNT);
assertEquals(10, msgsA.getMessageCount());
assertEquals(30, msgsB.getMessageCount());
assertEquals(10, msgsC.getMessageCount());
assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
assertEquals(MESSAGE_COUNT * 3, msgsB.getMessageCount());
assertEquals(MESSAGE_COUNT, msgsC.getMessageCount());
}
/**
@ -162,22 +163,22 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport {
MessageConsumer clientC = createConsumer("BrokerC", dest);
// Send messages
sendMessages("BrokerA", dest, 10);
sendMessages("BrokerB", dest, 10);
sendMessages("BrokerC", dest, 10);
sendMessages("BrokerA", dest, MESSAGE_COUNT);
sendMessages("BrokerB", dest, MESSAGE_COUNT);
sendMessages("BrokerC", dest, MESSAGE_COUNT);
// Get message count
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
msgsA.waitForMessagesToArrive(30);
msgsB.waitForMessagesToArrive(30);
msgsC.waitForMessagesToArrive(30);
msgsA.waitForMessagesToArrive(MESSAGE_COUNT * 3);
msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 3);
msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 3);
assertEquals(30, msgsA.getMessageCount());
assertEquals(30, msgsB.getMessageCount());
assertEquals(30, msgsC.getMessageCount());
assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount());
assertEquals(MESSAGE_COUNT * 3, msgsB.getMessageCount());
assertEquals(MESSAGE_COUNT * 3, msgsC.getMessageCount());
}
/**
@ -198,22 +199,22 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport {
MessageConsumer clientC = createConsumer("BrokerC", dest);
// Send messages
sendMessages("BrokerA", dest, 10);
sendMessages("BrokerB", dest, 10);
sendMessages("BrokerC", dest, 10);
sendMessages("BrokerA", dest, MESSAGE_COUNT);
sendMessages("BrokerB", dest, MESSAGE_COUNT);
sendMessages("BrokerC", dest, MESSAGE_COUNT);
// Get message count
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
msgsA.waitForMessagesToArrive(30);
msgsB.waitForMessagesToArrive(30);
msgsC.waitForMessagesToArrive(30);
msgsA.waitForMessagesToArrive(MESSAGE_COUNT * 3);
msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 3);
msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 3);
assertEquals(30, msgsA.getMessageCount());
assertEquals(30, msgsB.getMessageCount());
assertEquals(30, msgsC.getMessageCount());
assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount());
assertEquals(MESSAGE_COUNT * 3, msgsB.getMessageCount());
assertEquals(MESSAGE_COUNT * 3, msgsC.getMessageCount());
}
public void setUp() throws Exception {

View File

@ -36,6 +36,8 @@ import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
* @version $Revision: 1.1.1.1 $
*/
public class TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest extends JmsMultipleBrokersTestSupport {
protected static final int MESSAGE_COUNT = 10;
protected List bridges;
protected AtomicInteger msgDispatchCount;
@ -56,20 +58,20 @@ public class TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest extends JmsMultip
MessageConsumer clientB = createConsumer("BrokerB", dest);
// Send messages
sendMessages("BrokerA", dest, 10);
sendMessages("BrokerA", dest, MESSAGE_COUNT);
// Get message count
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
msgsA.waitForMessagesToArrive(10);
msgsB.waitForMessagesToArrive(10);
msgsA.waitForMessagesToArrive(MESSAGE_COUNT);
msgsB.waitForMessagesToArrive(MESSAGE_COUNT);
assertEquals(10, msgsA.getMessageCount());
assertEquals(10, msgsB.getMessageCount());
assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
assertEquals(MESSAGE_COUNT, msgsB.getMessageCount());
// Check that 10 message dispatch commands are send over the network
assertEquals(10, msgDispatchCount.get());
assertEquals(MESSAGE_COUNT, msgDispatchCount.get());
}
/**
@ -88,14 +90,14 @@ public class TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest extends JmsMultip
MessageConsumer clientA = createConsumer("BrokerA", dest);
// Send messages
sendMessages("BrokerA", dest, 10);
sendMessages("BrokerA", dest, MESSAGE_COUNT);
// Get message count
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
msgsA.waitForMessagesToArrive(10);
msgsA.waitForMessagesToArrive(MESSAGE_COUNT);
assertEquals(10, msgsA.getMessageCount());
assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
// Check that no message dispatch commands are send over the network
assertEquals(0, msgDispatchCount.get());

View File

@ -0,0 +1,365 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.Connection;
import javax.jms.Session;
import javax.jms.MessageConsumer;
import java.net.URI;
/**
* @version $Revision: 1.1.1.1 $
*/
public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSupport {
protected static final int MESSAGE_COUNT = 100; // Best if a factor of 100
protected static final int PREFETCH_COUNT = 1;
protected int msgsClient1, msgsClient2;
protected String broker1, broker2;
public void testClientAReceivesOnly() throws Exception {
broker1 = "BrokerA";
broker2 = "BrokerB";
doOneClientReceivesOnly();
}
public void testClientBReceivesOnly() throws Exception {
broker1 = "BrokerB";
broker2 = "BrokerA";
doOneClientReceivesOnly();
}
public void doOneClientReceivesOnly() throws Exception {
// Bridge brokers
bridgeBrokers(broker1, broker2);
bridgeBrokers(broker2, broker1);
// Run brokers
startAllBrokers();
// Create queue
Destination dest = createDestination("TEST.FOO", false);
// Create consumers
MessageConsumer client1 = createConsumer(broker1, dest);
MessageConsumer client2 = createConsumer(broker2, dest);
// Give clients time to register with broker
Thread.sleep(500);
// Always send messages to broker A
sendMessages("BrokerA", dest, MESSAGE_COUNT);
// Close the second client, messages should be sent to the first client
client2.close();
// Let the first client receive all messages
msgsClient1 += receiveAllMessages(client1);
client1.close();
// First client should have received 100 messages
assertEquals("Client for " + broker1 + " should have receive all messages.", MESSAGE_COUNT, msgsClient1);
}
public void testClientAReceivesOnlyAfterReconnect() throws Exception {
broker1 = "BrokerA";
broker2 = "BrokerB";
doOneClientReceivesOnlyAfterReconnect();
}
public void testClientBReceivesOnlyAfterReconnect() throws Exception {
broker1 = "BrokerB";
broker2 = "BrokerA";
doOneClientReceivesOnlyAfterReconnect();
}
public void doOneClientReceivesOnlyAfterReconnect() throws Exception {
// Bridge brokers
bridgeBrokers(broker1, broker2);
bridgeBrokers(broker2, broker1);
// Run brokers
startAllBrokers();
// Create queue
Destination dest = createDestination("TEST.FOO", false);
// Create first consumer
MessageConsumer client1 = createConsumer(broker1, dest);
MessageConsumer client2 = createConsumer(broker2, dest);
// Give clients time to register with broker
Thread.sleep(500);
// Always send message to broker A
sendMessages("BrokerA", dest, MESSAGE_COUNT);
// Let the first client receive the first 20% of messages
msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20));
// Disconnect the first client
client1.close();
// Create another client for the first broker
client1 = createConsumer(broker1, dest);
Thread.sleep(500);
// Close the second client, messages should be sent to the first client
client2.close();
// Receive the rest of the messages
msgsClient1 += receiveAllMessages(client1);
client1.close();
// The first client should have received 100 messages
assertEquals("Client for " + broker1 + " should have received all messages.", MESSAGE_COUNT, msgsClient1);
}
public void testTwoClientsReceiveClientADisconnects() throws Exception {
broker1 = "BrokerA";
broker2 = "BrokerB";
doTwoClientsReceiveOneClientDisconnects();
}
public void testTwoClientsReceiveClientBDisconnects() throws Exception {
broker1 = "BrokerB";
broker2 = "BrokerA";
doTwoClientsReceiveOneClientDisconnects();
}
public void doTwoClientsReceiveOneClientDisconnects() throws Exception {
// Bridge brokers
bridgeBrokers(broker1, broker2);
bridgeBrokers(broker2, broker1);
// Run brokers
startAllBrokers();
// Create queue
Destination dest = createDestination("TEST.FOO", false);
// Create first client
MessageConsumer client1 = createConsumer(broker1, dest);
MessageConsumer client2 = createConsumer(broker2, dest);
// Give clients time to register with broker
Thread.sleep(500);
// Always send messages to broker A
sendMessages("BrokerA", dest, MESSAGE_COUNT);
// Let each client receive 20% of the messages - 40% total
msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20));
msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20));
// Disconnect the first client
client1.close();
// Let the second client receive the rest of the messages
msgsClient2 += receiveAllMessages(client2);
client2.close();
// First client should have received 20% of the messages
assertEquals("Client for " + broker1 + " should have received 20% of the messages.", (int)(MESSAGE_COUNT * 0.20), msgsClient1);
// Second client should have received 80% of the messages
assertEquals("Client for " + broker2 + " should have received 80% of the messages.", (int)(MESSAGE_COUNT * 0.80), msgsClient2);
}
public void testTwoClientsReceiveClientAReconnects() throws Exception {
broker1 = "BrokerA";
broker2 = "BrokerB";
doTwoClientsReceiveOneClientReconnects();
}
public void testTwoClientsReceiveClientBReconnects() throws Exception {
broker1 = "BrokerB";
broker2 = "BrokerA";
doTwoClientsReceiveOneClientReconnects();
}
public void doTwoClientsReceiveOneClientReconnects() throws Exception {
// Bridge brokers
bridgeBrokers(broker1, broker2);
bridgeBrokers(broker2, broker1);
// Run brokers
startAllBrokers();
// Create queue
Destination dest = createDestination("TEST.FOO", false);
// Create the first client
MessageConsumer client1 = createConsumer(broker1, dest);
MessageConsumer client2 = createConsumer(broker2, dest);
// Give clients time to register with broker
Thread.sleep(500);
// Always send messages to broker A
sendMessages("BrokerA", dest, MESSAGE_COUNT);
// Let each client receive 20% of the messages - 40% total
msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20));
msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20));
// Disconnect the first client
client1.close();
// Let the second client receive 20% more of the total messages
msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20));
// Create another client for broker 1
client1 = createConsumer(broker1, dest);
Thread.sleep(500);
// Let each client receive 20% of the messages - 40% total
msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20));
client1.close();
msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20));
client2.close();
// First client should have received 40 messages
assertEquals("Client for " + broker1 + " should have received 40% of the messages.", (int)(MESSAGE_COUNT * 0.40), msgsClient1);
// Second client should have received 60 messages
assertEquals("Client for " + broker2 + " should have received 60% of the messages.", (int)(MESSAGE_COUNT * 0.60), msgsClient2);
}
public void testTwoClientsReceiveTwoClientReconnects() throws Exception {
broker1 = "BrokerA";
broker2 = "BrokerB";
// Bridge brokers
bridgeBrokers(broker1, broker2);
bridgeBrokers(broker2, broker1);
// Run brokers
startAllBrokers();
// Create queue
Destination dest = createDestination("TEST.FOO", false);
// Create the first client
MessageConsumer client1 = createConsumer(broker1, dest);
MessageConsumer client2 = createConsumer(broker2, dest);
// Give clients time to register with broker
Thread.sleep(500);
// Always send messages to broker A
sendMessages("BrokerA", dest, MESSAGE_COUNT);
// Let each client receive 20% of the messages - 40% total
msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20));
msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20));
// Disconnect both clients
client1.close();
client2.close();
// Create another two clients for each broker
client1 = createConsumer(broker1, dest);
client2 = createConsumer(broker2, dest);
Thread.sleep(500);
// Let each client receive 30% more of the total messages - 60% total
msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.30));
client1.close();
msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.30));
client2.close();
// First client should have received 50% of the messages
assertEquals("Client for " + broker1 + " should have received 50% of the messages.", (int)(MESSAGE_COUNT * 0.50), msgsClient1);
// Second client should have received 50% of the messages
assertEquals("Client for " + broker2 + " should have received 50% of the messages.", (int)(MESSAGE_COUNT * 0.50), msgsClient2);
}
protected int receiveExactMessages(MessageConsumer consumer, int msgCount) throws Exception {
Message msg;
int i;
for (i=0; i<msgCount; i++) {
msg = consumer.receive(1000);
if (msg == null) {
System.err.println("Consumer failed to receive exactly " + msgCount + " messages. Actual messages received is: " + i);
break;
}
}
return i;
}
protected int receiveAllMessages(MessageConsumer consumer) throws Exception {
int msgsReceived = 0;
Message msg;
do {
msg = consumer.receive(1000);
if (msg != null) {
msgsReceived++;
}
} while (msg != null);
return msgsReceived;
}
protected MessageConsumer createConsumer(String brokerName, Destination dest) throws Exception {
Connection conn = createConnection(brokerName);
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
return sess.createConsumer(dest);
}
public void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();
createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false"));
createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
// Configure broker connection factory
ActiveMQConnectionFactory factoryA, factoryB;
factoryA = (ActiveMQConnectionFactory)getConnectionFactory("BrokerA");
factoryB = (ActiveMQConnectionFactory)getConnectionFactory("BrokerB");
// Set prefetch policy
ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy();
policy.setAll(PREFETCH_COUNT);
factoryA.setPrefetchPolicy(policy);
factoryB.setPrefetchPolicy(policy);
msgsClient1 = 0;
msgsClient2 = 0;
}
}