git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@999816 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2010-09-22 08:50:54 +00:00
parent acb8164bc6
commit 9041c3fc78
5 changed files with 63 additions and 16 deletions

View File

@ -85,25 +85,29 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
BrokerService localBroker = brokers.get(localBrokerName).broker;
BrokerService remoteBroker = brokers.get(remoteBrokerName).broker;
bridgeBrokers(localBroker, remoteBroker, dynamicOnly, 1, true);
bridgeBrokers(localBroker, remoteBroker, dynamicOnly, 1, true, false);
}
protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName, boolean dynamicOnly, int networkTTL, boolean conduit) throws Exception {
BrokerService localBroker = brokers.get(localBrokerName).broker;
BrokerService remoteBroker = brokers.get(remoteBrokerName).broker;
return bridgeBrokers(localBroker, remoteBroker, dynamicOnly, networkTTL, conduit);
return bridgeBrokers(localBroker, remoteBroker, dynamicOnly, networkTTL, conduit, false);
}
// Overwrite this method to specify how you want to bridge the two brokers
// By default, bridge them using add network connector of the local broker
// and the first connector of the remote broker
protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean conduit) throws Exception {
protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean conduit, boolean failover) throws Exception {
List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
URI remoteURI;
if (!transportConnectors.isEmpty()) {
remoteURI = transportConnectors.get(0).getConnectUri();
NetworkConnector connector = new DiscoveryNetworkConnector(new URI("static:" + remoteURI));
String uri = "static:(" + remoteURI + ")";
if (failover) {
uri = "static:(failover:(" + remoteURI + "))";
}
NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri));
connector.setDynamicOnly(dynamicOnly);
connector.setNetworkTTL(networkTTL);
connector.setConduitSubscriptions(conduit);

View File

@ -20,6 +20,9 @@ import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.MessageIdList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.MessageConsumer;
@ -28,6 +31,8 @@ import java.net.URI;
public class AMQ2927Test extends JmsMultipleBrokersTestSupport {
private static final Log LOG = LogFactory.getLog(AMQ2927Test.class);
ActiveMQQueue queue = new ActiveMQQueue("TEST");
@Override
@ -46,31 +51,72 @@ public class AMQ2927Test extends JmsMultipleBrokersTestSupport {
}
public void testFailoverRestart() throws Exception {
public void testRestartSend() throws Exception {
Thread.sleep(1000);
System.out.println("restarting broker");
LOG.info("restarting broker");
restartBroker("BrokerA");
Thread.sleep(5000);
System.out.println("sending message");
LOG.info("sending message");
sendMessages("BrokerA", queue, 1);
Thread.sleep(3000);
System.out.println("consuming message");
LOG.info("consuming message");
MessageConsumer consumerA = createConsumer("BrokerA", queue);
MessageConsumer consumerB = createConsumer("BrokerB", queue);
Thread.sleep(1000);
System.out.println("consumerA = " + getConsumerMessages("BrokerA", consumerA));
System.out.println("consumerB = " + getConsumerMessages("BrokerB", consumerB));
MessageIdList messagesA = getConsumerMessages("BrokerA", consumerA);
MessageIdList messagesB = getConsumerMessages("BrokerB", consumerB);
LOG.info("consumerA = " + messagesA);
LOG.info("consumerB = " + messagesB);
messagesA.assertMessagesReceived(0);
messagesB.assertMessagesReceived(1);
}
public void testSendRestart() throws Exception {
Thread.sleep(1000);
LOG.info("sending message");
sendMessages("BrokerA", queue, 1);
Thread.sleep(3000);
LOG.info("restarting broker");
restartBroker("BrokerA");
Thread.sleep(5000);
LOG.info("consuming message");
MessageConsumer consumerA = createConsumer("BrokerA", queue);
MessageConsumer consumerB = createConsumer("BrokerB", queue);
Thread.sleep(1000);
MessageIdList messagesA = getConsumerMessages("BrokerA", consumerA);
MessageIdList messagesB = getConsumerMessages("BrokerB", consumerB);
LOG.info("consumerA = " + messagesA);
LOG.info("consumerB = " + messagesB);
messagesA.assertMessagesReceived(0);
messagesB.assertMessagesReceived(1);
}
protected void restartBroker(String brokerName) throws Exception {

View File

@ -26,7 +26,6 @@ import javax.jms.TextMessage;
import junit.framework.Test;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.JmsMultipleBrokersTestSupport.BrokerItem;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.network.DiscoveryNetworkConnector;
@ -136,7 +135,7 @@ public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTest
@Override
protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean conduit) throws Exception {
protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean conduit, boolean failover) throws Exception {
List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
URI remoteURI;
if (!transportConnectors.isEmpty()) {

View File

@ -26,13 +26,11 @@ import javax.jms.TextMessage;
import junit.framework.Test;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.JmsMultipleBrokersTestSupport.BrokerItem;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.util.SocketProxy;
public class MulticastDiscoveryOnFaultyNetworkTest extends JmsMultipleBrokersTestSupport {
@ -111,7 +109,7 @@ public class MulticastDiscoveryOnFaultyNetworkTest extends JmsMultipleBrokersTes
@Override
protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean conduit) throws Exception {
protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean conduit, boolean failover) throws Exception {
DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI("multicast://default?group=TESTERIC&useLocalHost=false"));
connector.setDynamicOnly(dynamicOnly);
connector.setNetworkTTL(networkTTL);

View File

@ -38,7 +38,7 @@ public class ThreeBrokerStompTemporaryQueueTest extends JmsMultipleBrokersTestSu
private static final Log LOG = LogFactory.getLog(ThreeBrokerStompTemporaryQueueTest.class);
private StompConnection stompConnection;
protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean conduit) throws Exception {
protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean conduit, boolean failover) throws Exception {
List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
URI remoteURI;
if (!transportConnectors.isEmpty()) {