mirror of https://github.com/apache/activemq.git
- Added test cases for multiple brokers
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@367253 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
52aeaa2965
commit
579a1d4130
|
@ -0,0 +1,367 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Copyright 2005-2006 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq;
|
||||||
|
|
||||||
|
import javax.jms.*;
|
||||||
|
|
||||||
|
import org.apache.activemq.util.MessageIdList;
|
||||||
|
import org.apache.activemq.util.IdGenerator;
|
||||||
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.apache.activemq.xbean.BrokerFactoryBean;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.BrokerFactory;
|
||||||
|
import org.apache.activemq.broker.TransportConnector;
|
||||||
|
import org.apache.activemq.CombinationTestSupport;
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.ConnectionClosedException;
|
||||||
|
import org.springframework.core.io.Resource;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.net.URI;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test case support that allows the easy management and connection of several brokers.
|
||||||
|
*
|
||||||
|
* @version $Revision$
|
||||||
|
*/
|
||||||
|
public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
|
||||||
|
public static final String AUTO_ASSIGN_TRANSPORT = "tcp://localhost:0";
|
||||||
|
public static int MAX_SETUP_TIME = 5000;
|
||||||
|
|
||||||
|
protected Map brokers;
|
||||||
|
protected Map destinations;
|
||||||
|
|
||||||
|
protected int messageSize = 1;
|
||||||
|
|
||||||
|
protected boolean verbose = false;
|
||||||
|
|
||||||
|
protected void bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception {
|
||||||
|
BrokerService localBroker = ((BrokerItem)brokers.get(localBrokerName)).broker;
|
||||||
|
BrokerService remoteBroker = ((BrokerItem)brokers.get(remoteBrokerName)).broker;
|
||||||
|
|
||||||
|
bridgeBrokers(localBroker, remoteBroker);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Overwrite this method to specify how you want to bridge the two brokers
|
||||||
|
// By default, bridge them using add network connector of the local broker and the first connector of the remote broker
|
||||||
|
protected void bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker) throws Exception {
|
||||||
|
List transportConnectors = remoteBroker.getTransportConnectors();
|
||||||
|
URI remoteURI;
|
||||||
|
if (!transportConnectors.isEmpty()) {
|
||||||
|
remoteURI = ((TransportConnector)transportConnectors.get(0)).getConnectUri();
|
||||||
|
localBroker.addNetworkConnector("static:" + remoteURI);
|
||||||
|
} else {
|
||||||
|
throw new Exception("Remote broker has no registered connectors.");
|
||||||
|
}
|
||||||
|
|
||||||
|
MAX_SETUP_TIME = 2000;
|
||||||
|
}
|
||||||
|
|
||||||
|
// This will interconnect all brokes using multicast
|
||||||
|
protected void bridgeAllBrokers() throws Exception {
|
||||||
|
bridgeAllBrokers("default");
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void bridgeAllBrokers(String groupName) throws Exception {
|
||||||
|
Collection brokerList = brokers.values();
|
||||||
|
for (Iterator i=brokerList.iterator(); i.hasNext();) {
|
||||||
|
BrokerService broker = ((BrokerItem)i.next()).broker;
|
||||||
|
List transportConnectors = broker.getTransportConnectors();
|
||||||
|
|
||||||
|
if (transportConnectors.isEmpty()) {
|
||||||
|
broker.addConnector(new URI(AUTO_ASSIGN_TRANSPORT));
|
||||||
|
transportConnectors = broker.getTransportConnectors();
|
||||||
|
}
|
||||||
|
|
||||||
|
TransportConnector transport = (TransportConnector)transportConnectors.get(0);
|
||||||
|
transport.setDiscoveryUri(new URI("multicast://" + groupName));
|
||||||
|
broker.addNetworkConnector("multicast://" + groupName);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Multicasting may take longer to setup
|
||||||
|
MAX_SETUP_TIME = 8000;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void startAllBrokers() throws Exception {
|
||||||
|
Collection brokerList = brokers.values();
|
||||||
|
for (Iterator i=brokerList.iterator(); i.hasNext();) {
|
||||||
|
BrokerService broker = ((BrokerItem)i.next()).broker;
|
||||||
|
broker.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
Thread.sleep(MAX_SETUP_TIME);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected BrokerService createBroker(String brokerName) throws Exception {
|
||||||
|
BrokerService broker = new BrokerService();
|
||||||
|
broker.setBrokerName(brokerName);
|
||||||
|
brokers.put(brokerName, new BrokerItem(broker));
|
||||||
|
|
||||||
|
return broker;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected BrokerService createBroker(URI brokerUri) throws Exception {
|
||||||
|
BrokerService broker = BrokerFactory.createBroker(brokerUri);
|
||||||
|
brokers.put(broker.getBrokerName(), new BrokerItem(broker));
|
||||||
|
|
||||||
|
return broker;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected BrokerService createBroker(Resource configFile) throws Exception {
|
||||||
|
BrokerFactoryBean brokerFactory = new BrokerFactoryBean(configFile);
|
||||||
|
brokerFactory.afterPropertiesSet();
|
||||||
|
|
||||||
|
BrokerService broker = brokerFactory.getBroker();
|
||||||
|
brokers.put(broker.getBrokerName(), new BrokerItem(broker));
|
||||||
|
|
||||||
|
return broker;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Connection createConnection(String brokerName) throws Exception {
|
||||||
|
BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
|
||||||
|
if (brokerItem != null) {
|
||||||
|
return brokerItem.createConnection();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected MessageConsumer createConsumer(String brokerName, Destination dest) throws Exception {
|
||||||
|
BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
|
||||||
|
if (brokerItem != null) {
|
||||||
|
return brokerItem.createConsumer(dest);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected MessageConsumer createDurableSubscriber(String brokerName, Topic dest, String name) throws Exception {
|
||||||
|
BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
|
||||||
|
if (brokerItem != null) {
|
||||||
|
return brokerItem.createDurableSubscriber(dest, name);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected MessageIdList getBrokerMessages(String brokerName) {
|
||||||
|
BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
|
||||||
|
if (brokerItem != null) {
|
||||||
|
return brokerItem.getAllMessages();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected MessageIdList getConsumerMessages(String brokerName, MessageConsumer consumer) {
|
||||||
|
BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
|
||||||
|
if (brokerItem != null) {
|
||||||
|
return brokerItem.getConsumerMessages(consumer);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void sendMessages(String brokerName, Destination destination, int count) throws Exception {
|
||||||
|
BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
|
||||||
|
|
||||||
|
Connection conn = brokerItem.createConnection();
|
||||||
|
conn.start();
|
||||||
|
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
MessageProducer producer = brokerItem.createProducer(destination, sess);
|
||||||
|
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
TextMessage msg = createTextMessage(sess, conn.getClientID() + ": Message-" + i);
|
||||||
|
producer.send(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
producer.close();
|
||||||
|
sess.close();
|
||||||
|
conn.close();
|
||||||
|
brokerItem.connections.remove(conn);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected TextMessage createTextMessage(Session session, String initText) throws Exception {
|
||||||
|
TextMessage msg = session.createTextMessage();
|
||||||
|
|
||||||
|
// Pad message text
|
||||||
|
if (initText.length() < messageSize) {
|
||||||
|
char[] data = new char[messageSize - initText.length()];
|
||||||
|
Arrays.fill(data, '*');
|
||||||
|
String str = new String(data);
|
||||||
|
msg.setText(initText + str);
|
||||||
|
|
||||||
|
// Do not pad message text
|
||||||
|
} else {
|
||||||
|
msg.setText(initText);
|
||||||
|
}
|
||||||
|
|
||||||
|
return msg;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ActiveMQDestination createDestination(String name, boolean topic) throws JMSException {
|
||||||
|
Destination dest;
|
||||||
|
if (topic) {
|
||||||
|
dest = new ActiveMQTopic(name);
|
||||||
|
destinations.put(name, dest);
|
||||||
|
return (ActiveMQDestination)dest;
|
||||||
|
} else {
|
||||||
|
dest = new ActiveMQQueue(name);
|
||||||
|
destinations.put(name, dest);
|
||||||
|
return (ActiveMQDestination)dest;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
brokers = new HashMap();
|
||||||
|
destinations = new HashMap();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void tearDown() throws Exception {
|
||||||
|
destroyAllBrokers();
|
||||||
|
super.tearDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void destroyBroker(String brokerName) throws Exception {
|
||||||
|
BrokerItem brokerItem = (BrokerItem)brokers.remove(brokerName);
|
||||||
|
|
||||||
|
if (brokerItem != null) {
|
||||||
|
brokerItem.destroy();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void destroyAllBrokers() throws Exception {
|
||||||
|
for (Iterator i=brokers.values().iterator(); i.hasNext();) {
|
||||||
|
BrokerItem brokerItem = (BrokerItem)i.next();
|
||||||
|
brokerItem.destroy();
|
||||||
|
}
|
||||||
|
brokers.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Class to group broker components together
|
||||||
|
protected class BrokerItem {
|
||||||
|
public BrokerService broker;
|
||||||
|
public ActiveMQConnectionFactory factory;
|
||||||
|
public List connections;
|
||||||
|
public Map consumers;
|
||||||
|
public MessageIdList allMessages = new MessageIdList();
|
||||||
|
|
||||||
|
private IdGenerator id;
|
||||||
|
|
||||||
|
public boolean persistent = false;
|
||||||
|
|
||||||
|
public BrokerItem(BrokerService broker) throws Exception {
|
||||||
|
this.broker = broker;
|
||||||
|
|
||||||
|
factory = new ActiveMQConnectionFactory(broker.getVmConnectorURI());
|
||||||
|
consumers = Collections.synchronizedMap(new HashMap());
|
||||||
|
connections = Collections.synchronizedList(new ArrayList());
|
||||||
|
allMessages.setVerbose(verbose);
|
||||||
|
id = new IdGenerator(broker.getBrokerName() + ":");
|
||||||
|
}
|
||||||
|
|
||||||
|
public Connection createConnection() throws Exception {
|
||||||
|
Connection conn = factory.createConnection();
|
||||||
|
conn.setClientID(id.generateId());
|
||||||
|
|
||||||
|
connections.add(conn);
|
||||||
|
return conn;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MessageConsumer createConsumer(Destination dest) throws Exception {
|
||||||
|
Connection c = createConnection();
|
||||||
|
c.start();
|
||||||
|
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
return createConsumer(dest, s);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MessageConsumer createConsumer(Destination dest, Session sess) throws Exception {
|
||||||
|
MessageConsumer client = sess.createConsumer(dest);
|
||||||
|
MessageIdList messageIdList = new MessageIdList();
|
||||||
|
messageIdList.setParent(allMessages);
|
||||||
|
client.setMessageListener(messageIdList);
|
||||||
|
consumers.put(client, messageIdList);
|
||||||
|
|
||||||
|
return client;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MessageConsumer createDurableSubscriber(Topic dest, String name) throws Exception {
|
||||||
|
Connection c = createConnection();
|
||||||
|
c.start();
|
||||||
|
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
return createDurableSubscriber(dest, s, name);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MessageConsumer createDurableSubscriber(Topic dest, Session sess, String name) throws Exception {
|
||||||
|
MessageConsumer client = sess.createDurableSubscriber((Topic)dest, name);
|
||||||
|
MessageIdList messageIdList = new MessageIdList();
|
||||||
|
messageIdList.setParent(allMessages);
|
||||||
|
client.setMessageListener(messageIdList);
|
||||||
|
consumers.put(client, messageIdList);
|
||||||
|
|
||||||
|
return client;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MessageIdList getAllMessages() {
|
||||||
|
return allMessages;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MessageIdList getConsumerMessages(MessageConsumer consumer) {
|
||||||
|
return (MessageIdList)consumers.get(consumer);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MessageProducer createProducer(Destination dest) throws Exception {
|
||||||
|
Connection c = createConnection();
|
||||||
|
c.start();
|
||||||
|
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
return createProducer(dest, s);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MessageProducer createProducer(Destination dest, Session sess) throws Exception {
|
||||||
|
MessageProducer client = sess.createProducer(dest);
|
||||||
|
client.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
|
||||||
|
return client;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void destroy() throws Exception {
|
||||||
|
while (!connections.isEmpty()) {
|
||||||
|
Connection c = (Connection)connections.remove(0);
|
||||||
|
try {
|
||||||
|
c.close();
|
||||||
|
} catch (ConnectionClosedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
broker.stop();
|
||||||
|
consumers.clear();
|
||||||
|
|
||||||
|
broker = null;
|
||||||
|
connections = null;
|
||||||
|
consumers = null;
|
||||||
|
factory = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,119 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Copyright 2005-2006 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.usecases;
|
||||||
|
|
||||||
|
import org.apache.activemq.util.MessageIdList;
|
||||||
|
import org.apache.activemq.JmsMultipleBrokersTestSupport;
|
||||||
|
|
||||||
|
import javax.jms.Destination;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.net.URI;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @version $Revision: 1.1.1.1 $
|
||||||
|
*/
|
||||||
|
public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport {
|
||||||
|
public static final int BROKER_COUNT = 5; // number of brokers to network
|
||||||
|
public static final int CONSUMER_COUNT = 3; // consumers per broker
|
||||||
|
public static final int PRODUCER_COUNT = 3; // producers per broker
|
||||||
|
public static final int MESSAGE_COUNT = 10; // messages per producer
|
||||||
|
|
||||||
|
protected Map consumerMap;
|
||||||
|
|
||||||
|
public void testTopicAllConnected() throws Exception {
|
||||||
|
bridgeAllBrokers();
|
||||||
|
startAllBrokers();
|
||||||
|
|
||||||
|
|
||||||
|
// Setup topic destination
|
||||||
|
Destination dest = createDestination("TEST.FOO", true);
|
||||||
|
|
||||||
|
// Setup consumers
|
||||||
|
for (int i=1; i<=BROKER_COUNT; i++) {
|
||||||
|
for (int j=0; j<CONSUMER_COUNT; j++) {
|
||||||
|
consumerMap.put("Consumer:" + i + ":" + j, createConsumer("Broker" + i, dest));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send messages
|
||||||
|
for (int i=1; i<=BROKER_COUNT; i++) {
|
||||||
|
for (int j=0; j<PRODUCER_COUNT; j++) {
|
||||||
|
sendMessages("Broker" + i, dest, MESSAGE_COUNT);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get message count
|
||||||
|
for (int i=1; i<=BROKER_COUNT; i++) {
|
||||||
|
for (int j=0; j<CONSUMER_COUNT; j++) {
|
||||||
|
MessageIdList msgs = getConsumerMessages("Broker" + i, (MessageConsumer)consumerMap.get("Consumer:" + i + ":" + j));
|
||||||
|
msgs.waitForMessagesToArrive(BROKER_COUNT * PRODUCER_COUNT * MESSAGE_COUNT);
|
||||||
|
assertEquals(BROKER_COUNT * PRODUCER_COUNT * MESSAGE_COUNT, msgs.getMessageCount());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testQueueAllConnected() throws Exception {
|
||||||
|
bridgeAllBrokers();
|
||||||
|
|
||||||
|
startAllBrokers();
|
||||||
|
|
||||||
|
// Setup topic destination
|
||||||
|
Destination dest = createDestination("TEST.FOO", false);
|
||||||
|
|
||||||
|
// Setup consumers
|
||||||
|
for (int i=1; i<=BROKER_COUNT; i++) {
|
||||||
|
for (int j=0; j<CONSUMER_COUNT; j++) {
|
||||||
|
consumerMap.put("Consumer:" + i + ":" + j, createConsumer("Broker" + i, dest));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send messages
|
||||||
|
for (int i=1; i<=BROKER_COUNT; i++) {
|
||||||
|
for (int j=0; j<PRODUCER_COUNT; j++) {
|
||||||
|
sendMessages("Broker" + i, dest, MESSAGE_COUNT);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for messages to be delivered
|
||||||
|
Thread.sleep(2000);
|
||||||
|
|
||||||
|
// Get message count
|
||||||
|
int totalMsg = 0;
|
||||||
|
for (int i=1; i<=BROKER_COUNT; i++) {
|
||||||
|
for (int j=0; j<CONSUMER_COUNT; j++) {
|
||||||
|
MessageIdList msgs = getConsumerMessages("Broker" + i, (MessageConsumer)consumerMap.get("Consumer:" + i + ":" + j));
|
||||||
|
totalMsg += msgs.getMessageCount();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(BROKER_COUNT * PRODUCER_COUNT * MESSAGE_COUNT, totalMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setAutoFail(true);
|
||||||
|
super.setUp();
|
||||||
|
|
||||||
|
// Setup n brokers
|
||||||
|
for (int i=1; i<=BROKER_COUNT; i++) {
|
||||||
|
createBroker(new URI("broker:()/Broker" + i + "?persistent=false&useJmx=false"));
|
||||||
|
}
|
||||||
|
|
||||||
|
consumerMap = new HashMap();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,81 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Copyright 2005-2006 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.usecases;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.TransportConnector;
|
||||||
|
import org.apache.activemq.network.DemandForwardingBridge;
|
||||||
|
import org.apache.activemq.transport.TransportFactory;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.net.URI;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @version $Revision: 1.1.1.1 $
|
||||||
|
*/
|
||||||
|
public class MultiBrokersMultiClientsUsingTcpTest extends MultiBrokersMultiClientsTest {
|
||||||
|
protected List bridges = new ArrayList();
|
||||||
|
|
||||||
|
protected void bridgeAllBrokers(String groupName) throws Exception {
|
||||||
|
for (int i=1; i<=BROKER_COUNT; i++) {
|
||||||
|
for (int j=1; j<=BROKER_COUNT; j++) {
|
||||||
|
if (i != j) {
|
||||||
|
bridgeBrokers("Broker" + i, "Broker" + j);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
MAX_SETUP_TIME = 5000;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker) throws Exception {
|
||||||
|
List remoteTransports = remoteBroker.getTransportConnectors();
|
||||||
|
List localTransports = localBroker.getTransportConnectors();
|
||||||
|
|
||||||
|
URI remoteURI, localURI;
|
||||||
|
if (!remoteTransports.isEmpty() && !localTransports.isEmpty()) {
|
||||||
|
remoteURI = ((TransportConnector)remoteTransports.get(0)).getConnectUri();
|
||||||
|
localURI = ((TransportConnector)localTransports.get(0)).getConnectUri();
|
||||||
|
|
||||||
|
// Ensure that we are connecting using tcp
|
||||||
|
if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) {
|
||||||
|
DemandForwardingBridge bridge = new DemandForwardingBridge(TransportFactory.connect(localURI),
|
||||||
|
TransportFactory.connect(remoteURI));
|
||||||
|
bridge.setClientId(localBroker.getBrokerName() + "_to_" + remoteBroker.getBrokerName());
|
||||||
|
bridges.add(bridge);
|
||||||
|
|
||||||
|
bridge.start();
|
||||||
|
} else {
|
||||||
|
throw new Exception("Remote broker or local broker is not using tcp connectors");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw new Exception("Remote broker or local broker has no registered connectors.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
|
||||||
|
// Assign a tcp connector to each broker
|
||||||
|
int j=0;
|
||||||
|
for (Iterator i=brokers.values().iterator(); i.hasNext();) {
|
||||||
|
((BrokerItem)i.next()).broker.addConnector("tcp://localhost:" + (61616 + j++));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,195 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Copyright 2005-2006 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.usecases;
|
||||||
|
|
||||||
|
import org.apache.activemq.JmsMultipleBrokersTestSupport;
|
||||||
|
import org.apache.activemq.util.MessageIdList;
|
||||||
|
|
||||||
|
import javax.jms.Destination;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import java.net.URI;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @version $Revision: 1.1.1.1 $
|
||||||
|
*/
|
||||||
|
public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* BrokerA -> BrokerB -> BrokerC
|
||||||
|
*/
|
||||||
|
public void test_AB_BC_BrokerNetwork() throws Exception {
|
||||||
|
// Setup broker networks
|
||||||
|
bridgeBrokers("BrokerA", "BrokerB");
|
||||||
|
bridgeBrokers("BrokerB", "BrokerC");
|
||||||
|
|
||||||
|
startAllBrokers();
|
||||||
|
|
||||||
|
// Setup destination
|
||||||
|
Destination dest = createDestination("TEST.FOO", false);
|
||||||
|
|
||||||
|
// Setup consumers
|
||||||
|
MessageConsumer clientC = createConsumer("BrokerC", dest);
|
||||||
|
|
||||||
|
// Send messages
|
||||||
|
sendMessages("BrokerA", dest, 10);
|
||||||
|
|
||||||
|
// Let's try to wait for any messages. Should be none.
|
||||||
|
Thread.sleep(1000);
|
||||||
|
|
||||||
|
// Get message count
|
||||||
|
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
|
||||||
|
assertEquals(0, msgsC.getMessageCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* BrokerA <- BrokerB -> BrokerC
|
||||||
|
*/
|
||||||
|
public void test_BA_BC_BrokerNetwork() throws Exception {
|
||||||
|
// Setup broker networks
|
||||||
|
bridgeBrokers("BrokerB", "BrokerA");
|
||||||
|
bridgeBrokers("BrokerB", "BrokerC");
|
||||||
|
|
||||||
|
startAllBrokers();
|
||||||
|
|
||||||
|
// Setup destination
|
||||||
|
Destination dest = createDestination("TEST.FOO", false);
|
||||||
|
|
||||||
|
// Setup consumers
|
||||||
|
MessageConsumer clientA = createConsumer("BrokerA", dest);
|
||||||
|
MessageConsumer clientC = createConsumer("BrokerC", dest);
|
||||||
|
|
||||||
|
// Send messages
|
||||||
|
sendMessages("BrokerB", dest, 10);
|
||||||
|
|
||||||
|
// Let's try to wait for any messages.
|
||||||
|
Thread.sleep(1000);
|
||||||
|
|
||||||
|
// Get message count
|
||||||
|
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
|
||||||
|
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
|
||||||
|
|
||||||
|
// Total received should be 10
|
||||||
|
assertEquals(10, msgsA.getMessageCount() + msgsC.getMessageCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* BrokerA -> BrokerB <- BrokerC
|
||||||
|
*/
|
||||||
|
public void test_AB_CB_BrokerNetwork() throws Exception {
|
||||||
|
// Setup broker networks
|
||||||
|
bridgeBrokers("BrokerA", "BrokerB");
|
||||||
|
bridgeBrokers("BrokerC", "BrokerB");
|
||||||
|
|
||||||
|
startAllBrokers();
|
||||||
|
|
||||||
|
// Setup destination
|
||||||
|
Destination dest = createDestination("TEST.FOO", false);
|
||||||
|
|
||||||
|
// Setup consumers
|
||||||
|
MessageConsumer clientB = createConsumer("BrokerB", dest);
|
||||||
|
|
||||||
|
// Send messages
|
||||||
|
sendMessages("BrokerA", dest, 10);
|
||||||
|
sendMessages("BrokerC", dest, 10);
|
||||||
|
|
||||||
|
// Get message count
|
||||||
|
MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
|
||||||
|
|
||||||
|
msgsB.waitForMessagesToArrive(20);
|
||||||
|
|
||||||
|
assertEquals(20, msgsB.getMessageCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* BrokerA <-> BrokerB <-> BrokerC
|
||||||
|
*/
|
||||||
|
public void testAllConnectedBrokerNetwork() throws Exception {
|
||||||
|
// Setup broker networks
|
||||||
|
bridgeBrokers("BrokerA", "BrokerB");
|
||||||
|
bridgeBrokers("BrokerB", "BrokerA");
|
||||||
|
bridgeBrokers("BrokerB", "BrokerC");
|
||||||
|
bridgeBrokers("BrokerC", "BrokerB");
|
||||||
|
bridgeBrokers("BrokerA", "BrokerC");
|
||||||
|
bridgeBrokers("BrokerC", "BrokerA");
|
||||||
|
|
||||||
|
startAllBrokers();
|
||||||
|
|
||||||
|
// Setup destination
|
||||||
|
Destination dest = createDestination("TEST.FOO", false);
|
||||||
|
|
||||||
|
// Setup consumers
|
||||||
|
MessageConsumer clientA = createConsumer("BrokerA", dest);
|
||||||
|
MessageConsumer clientB = createConsumer("BrokerB", dest);
|
||||||
|
MessageConsumer clientC = createConsumer("BrokerC", dest);
|
||||||
|
|
||||||
|
// Send messages
|
||||||
|
sendMessages("BrokerA", dest, 10);
|
||||||
|
sendMessages("BrokerB", dest, 10);
|
||||||
|
sendMessages("BrokerC", dest, 10);
|
||||||
|
|
||||||
|
// Let's try to wait for any messages.
|
||||||
|
Thread.sleep(1000);
|
||||||
|
|
||||||
|
// Get message count
|
||||||
|
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
|
||||||
|
MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
|
||||||
|
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
|
||||||
|
|
||||||
|
assertEquals(30, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* BrokerA <-> BrokerB <-> BrokerC
|
||||||
|
*/
|
||||||
|
public void testAllConnectedUsingMulticast() throws Exception {
|
||||||
|
// Setup broker networks
|
||||||
|
bridgeAllBrokers();
|
||||||
|
|
||||||
|
startAllBrokers();
|
||||||
|
|
||||||
|
// Setup destination
|
||||||
|
Destination dest = createDestination("TEST.FOO", false);
|
||||||
|
|
||||||
|
// Setup consumers
|
||||||
|
MessageConsumer clientA = createConsumer("BrokerA", dest);
|
||||||
|
MessageConsumer clientB = createConsumer("BrokerB", dest);
|
||||||
|
MessageConsumer clientC = createConsumer("BrokerC", dest);
|
||||||
|
|
||||||
|
// Send messages
|
||||||
|
sendMessages("BrokerA", dest, 10);
|
||||||
|
sendMessages("BrokerB", dest, 10);
|
||||||
|
sendMessages("BrokerC", dest, 10);
|
||||||
|
|
||||||
|
// Let's try to wait for any messages.
|
||||||
|
Thread.sleep(1000);
|
||||||
|
|
||||||
|
// Get message count
|
||||||
|
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
|
||||||
|
MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
|
||||||
|
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
|
||||||
|
|
||||||
|
assertEquals(30, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setAutoFail(true);
|
||||||
|
super.setUp();
|
||||||
|
createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false"));
|
||||||
|
createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
|
||||||
|
createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC?persistent=false&useJmx=false"));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,60 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Copyright 2005-2006 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.usecases;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.TransportConnector;
|
||||||
|
import org.apache.activemq.network.DemandForwardingBridge;
|
||||||
|
import org.apache.activemq.transport.TransportFactory;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.net.URI;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @version $Revision: 1.1.1.1 $
|
||||||
|
*/
|
||||||
|
public class ThreeBrokerQueueNetworkUsingTcpTest extends ThreeBrokerQueueNetworkTest {
|
||||||
|
protected List bridges = new ArrayList();
|
||||||
|
|
||||||
|
protected void bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker) throws Exception {
|
||||||
|
List remoteTransports = remoteBroker.getTransportConnectors();
|
||||||
|
List localTransports = localBroker.getTransportConnectors();
|
||||||
|
|
||||||
|
URI remoteURI, localURI;
|
||||||
|
if (!remoteTransports.isEmpty() && !localTransports.isEmpty()) {
|
||||||
|
remoteURI = ((TransportConnector)remoteTransports.get(0)).getConnectUri();
|
||||||
|
localURI = ((TransportConnector)localTransports.get(0)).getConnectUri();
|
||||||
|
|
||||||
|
// Ensure that we are connecting using tcp
|
||||||
|
if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) {
|
||||||
|
DemandForwardingBridge bridge = new DemandForwardingBridge(TransportFactory.connect(localURI),
|
||||||
|
TransportFactory.connect(remoteURI));
|
||||||
|
bridge.setClientId(localBroker.getBrokerName() + "_to_" + remoteBroker.getBrokerName());
|
||||||
|
bridges.add(bridge);
|
||||||
|
|
||||||
|
bridge.start();
|
||||||
|
} else {
|
||||||
|
throw new Exception("Remote broker or local broker is not using tcp connectors");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw new Exception("Remote broker or local broker has no registered connectors.");
|
||||||
|
}
|
||||||
|
|
||||||
|
MAX_SETUP_TIME = 2000;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,226 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Copyright 2005-2006 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.usecases;
|
||||||
|
|
||||||
|
import org.apache.activemq.util.MessageIdList;
|
||||||
|
import org.apache.activemq.JmsMultipleBrokersTestSupport;
|
||||||
|
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.Destination;
|
||||||
|
import java.net.URI;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @version $Revision: 1.1.1.1 $
|
||||||
|
*/
|
||||||
|
public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* BrokerA -> BrokerB -> BrokerC
|
||||||
|
*/
|
||||||
|
public void test_AB_BC_BrokerNetwork() throws Exception {
|
||||||
|
// Setup broker networks
|
||||||
|
bridgeBrokers("BrokerA", "BrokerB");
|
||||||
|
bridgeBrokers("BrokerB", "BrokerC");
|
||||||
|
|
||||||
|
startAllBrokers();
|
||||||
|
|
||||||
|
// Setup destination
|
||||||
|
Destination dest = createDestination("TEST.FOO", true);
|
||||||
|
|
||||||
|
// Setup consumers
|
||||||
|
MessageConsumer clientA = createConsumer("BrokerA", dest);
|
||||||
|
MessageConsumer clientB = createConsumer("BrokerB", dest);
|
||||||
|
MessageConsumer clientC = createConsumer("BrokerC", dest);
|
||||||
|
|
||||||
|
// Send messages
|
||||||
|
sendMessages("BrokerA", dest, 10);
|
||||||
|
sendMessages("BrokerB", dest, 10);
|
||||||
|
sendMessages("BrokerC", dest, 10);
|
||||||
|
|
||||||
|
// Get message count
|
||||||
|
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
|
||||||
|
MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
|
||||||
|
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
|
||||||
|
|
||||||
|
msgsA.waitForMessagesToArrive(10);
|
||||||
|
msgsB.waitForMessagesToArrive(20);
|
||||||
|
msgsC.waitForMessagesToArrive(20);
|
||||||
|
|
||||||
|
assertEquals(10, msgsA.getMessageCount());
|
||||||
|
assertEquals(20, msgsB.getMessageCount());
|
||||||
|
assertEquals(20, msgsC.getMessageCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* BrokerA <- BrokerB -> BrokerC
|
||||||
|
*/
|
||||||
|
public void test_BA_BC_BrokerNetwork() throws Exception {
|
||||||
|
// Setup broker networks
|
||||||
|
bridgeBrokers("BrokerB", "BrokerA");
|
||||||
|
bridgeBrokers("BrokerB", "BrokerC");
|
||||||
|
|
||||||
|
startAllBrokers();
|
||||||
|
|
||||||
|
// Setup destination
|
||||||
|
Destination dest = createDestination("TEST.FOO", true);
|
||||||
|
|
||||||
|
// Setup consumers
|
||||||
|
MessageConsumer clientA = createConsumer("BrokerA", dest);
|
||||||
|
MessageConsumer clientB = createConsumer("BrokerB", dest);
|
||||||
|
MessageConsumer clientC = createConsumer("BrokerC", dest);
|
||||||
|
|
||||||
|
// Send messages
|
||||||
|
sendMessages("BrokerA", dest, 10);
|
||||||
|
sendMessages("BrokerB", dest, 10);
|
||||||
|
sendMessages("BrokerC", dest, 10);
|
||||||
|
|
||||||
|
// Get message count
|
||||||
|
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
|
||||||
|
MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
|
||||||
|
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
|
||||||
|
|
||||||
|
msgsA.waitForMessagesToArrive(20);
|
||||||
|
msgsB.waitForMessagesToArrive(10);
|
||||||
|
msgsC.waitForMessagesToArrive(20);
|
||||||
|
|
||||||
|
assertEquals(20, msgsA.getMessageCount());
|
||||||
|
assertEquals(10, msgsB.getMessageCount());
|
||||||
|
assertEquals(20, msgsC.getMessageCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* BrokerA -> BrokerB <- BrokerC
|
||||||
|
*/
|
||||||
|
public void test_AB_CB_BrokerNetwork() throws Exception {
|
||||||
|
// Setup broker networks
|
||||||
|
bridgeBrokers("BrokerA", "BrokerB");
|
||||||
|
bridgeBrokers("BrokerC", "BrokerB");
|
||||||
|
|
||||||
|
startAllBrokers();
|
||||||
|
|
||||||
|
// Setup destination
|
||||||
|
Destination dest = createDestination("TEST.FOO", true);
|
||||||
|
|
||||||
|
// Setup consumers
|
||||||
|
MessageConsumer clientA = createConsumer("BrokerA", dest);
|
||||||
|
MessageConsumer clientB = createConsumer("BrokerB", dest);
|
||||||
|
MessageConsumer clientC = createConsumer("BrokerC", dest);
|
||||||
|
|
||||||
|
// Send messages
|
||||||
|
sendMessages("BrokerA", dest, 10);
|
||||||
|
sendMessages("BrokerB", dest, 10);
|
||||||
|
sendMessages("BrokerC", dest, 10);
|
||||||
|
|
||||||
|
// Get message count
|
||||||
|
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
|
||||||
|
MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
|
||||||
|
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
|
||||||
|
|
||||||
|
msgsA.waitForMessagesToArrive(10);
|
||||||
|
msgsB.waitForMessagesToArrive(30);
|
||||||
|
msgsC.waitForMessagesToArrive(10);
|
||||||
|
|
||||||
|
assertEquals(10, msgsA.getMessageCount());
|
||||||
|
assertEquals(30, msgsB.getMessageCount());
|
||||||
|
assertEquals(10, msgsC.getMessageCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* BrokerA <-> BrokerB <-> BrokerC
|
||||||
|
*/
|
||||||
|
public void testAllConnectedBrokerNetwork() throws Exception {
|
||||||
|
// Setup broker networks
|
||||||
|
bridgeBrokers("BrokerA", "BrokerB");
|
||||||
|
bridgeBrokers("BrokerB", "BrokerA");
|
||||||
|
bridgeBrokers("BrokerB", "BrokerC");
|
||||||
|
bridgeBrokers("BrokerC", "BrokerB");
|
||||||
|
bridgeBrokers("BrokerA", "BrokerC");
|
||||||
|
bridgeBrokers("BrokerC", "BrokerA");
|
||||||
|
|
||||||
|
startAllBrokers();
|
||||||
|
|
||||||
|
// Setup destination
|
||||||
|
Destination dest = createDestination("TEST.FOO", true);
|
||||||
|
|
||||||
|
// Setup consumers
|
||||||
|
MessageConsumer clientA = createConsumer("BrokerA", dest);
|
||||||
|
MessageConsumer clientB = createConsumer("BrokerB", dest);
|
||||||
|
MessageConsumer clientC = createConsumer("BrokerC", dest);
|
||||||
|
|
||||||
|
// Send messages
|
||||||
|
sendMessages("BrokerA", dest, 10);
|
||||||
|
sendMessages("BrokerB", dest, 10);
|
||||||
|
sendMessages("BrokerC", dest, 10);
|
||||||
|
|
||||||
|
// Get message count
|
||||||
|
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
|
||||||
|
MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
|
||||||
|
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
|
||||||
|
|
||||||
|
msgsA.waitForMessagesToArrive(30);
|
||||||
|
msgsB.waitForMessagesToArrive(30);
|
||||||
|
msgsC.waitForMessagesToArrive(30);
|
||||||
|
|
||||||
|
assertEquals(30, msgsA.getMessageCount());
|
||||||
|
assertEquals(30, msgsB.getMessageCount());
|
||||||
|
assertEquals(30, msgsC.getMessageCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* BrokerA <-> BrokerB <-> BrokerC
|
||||||
|
*/
|
||||||
|
public void testAllConnectedUsingMulticast() throws Exception {
|
||||||
|
// Setup broker networks
|
||||||
|
bridgeAllBrokers();
|
||||||
|
|
||||||
|
startAllBrokers();
|
||||||
|
|
||||||
|
// Setup destination
|
||||||
|
Destination dest = createDestination("TEST.FOO", true);
|
||||||
|
|
||||||
|
// Setup consumers
|
||||||
|
MessageConsumer clientA = createConsumer("BrokerA", dest);
|
||||||
|
MessageConsumer clientB = createConsumer("BrokerB", dest);
|
||||||
|
MessageConsumer clientC = createConsumer("BrokerC", dest);
|
||||||
|
|
||||||
|
// Send messages
|
||||||
|
sendMessages("BrokerA", dest, 10);
|
||||||
|
sendMessages("BrokerB", dest, 10);
|
||||||
|
sendMessages("BrokerC", dest, 10);
|
||||||
|
|
||||||
|
// Get message count
|
||||||
|
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
|
||||||
|
MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
|
||||||
|
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
|
||||||
|
|
||||||
|
msgsA.waitForMessagesToArrive(30);
|
||||||
|
msgsB.waitForMessagesToArrive(30);
|
||||||
|
msgsC.waitForMessagesToArrive(30);
|
||||||
|
|
||||||
|
assertEquals(30, msgsA.getMessageCount());
|
||||||
|
assertEquals(30, msgsB.getMessageCount());
|
||||||
|
assertEquals(30, msgsC.getMessageCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setAutoFail(true);
|
||||||
|
super.setUp();
|
||||||
|
createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false"));
|
||||||
|
createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
|
||||||
|
createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC?persistent=false&useJmx=false"));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,60 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Copyright 2005-2006 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.usecases;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.TransportConnector;
|
||||||
|
import org.apache.activemq.network.DemandForwardingBridge;
|
||||||
|
import org.apache.activemq.transport.TransportFactory;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.net.URI;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @version $Revision: 1.1.1.1 $
|
||||||
|
*/
|
||||||
|
public class ThreeBrokerTopicNetworkUsingTcpTest extends ThreeBrokerTopicNetworkTest {
|
||||||
|
protected List bridges = new ArrayList();
|
||||||
|
|
||||||
|
protected void bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker) throws Exception {
|
||||||
|
List remoteTransports = remoteBroker.getTransportConnectors();
|
||||||
|
List localTransports = localBroker.getTransportConnectors();
|
||||||
|
|
||||||
|
URI remoteURI, localURI;
|
||||||
|
if (!remoteTransports.isEmpty() && !localTransports.isEmpty()) {
|
||||||
|
remoteURI = ((TransportConnector)remoteTransports.get(0)).getConnectUri();
|
||||||
|
localURI = ((TransportConnector)localTransports.get(0)).getConnectUri();
|
||||||
|
|
||||||
|
// Ensure that we are connecting using tcp
|
||||||
|
if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) {
|
||||||
|
DemandForwardingBridge bridge = new DemandForwardingBridge(TransportFactory.connect(localURI),
|
||||||
|
TransportFactory.connect(remoteURI));
|
||||||
|
bridge.setClientId(localBroker.getBrokerName() + "_to_" + remoteBroker.getBrokerName());
|
||||||
|
bridges.add(bridge);
|
||||||
|
|
||||||
|
bridge.start();
|
||||||
|
} else {
|
||||||
|
throw new Exception("Remote broker or local broker is not using tcp connectors");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw new Exception("Remote broker or local broker has no registered connectors.");
|
||||||
|
}
|
||||||
|
|
||||||
|
MAX_SETUP_TIME = 2000;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue