From 585a966a9b512d0e1709e067f82d1998a8701d2b Mon Sep 17 00:00:00 2001 From: Bosanac Dejan Date: Mon, 8 Apr 2013 16:58:21 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-4461 - priority backup with multiple brokers git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1465683 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/failover/FailoverTransport.java | 4 +- .../failover/FailoverPriorityTest.java | 40 +++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 1e12441609..5c97a66852 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -1169,9 +1169,11 @@ public class FailoverTransport implements CompositeTransport { t.setTransportListener(bt); t.start(); bt.setTransport(t); - backups.add(bt); if (priorityBackup && isPriority(uri)) { priorityBackupAvailable = true; + backups.add(0, bt); + } else { + backups.add(bt); } } } catch (Exception e) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java index dde1c3546e..d80e6e4477 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java @@ -16,10 +16,12 @@ */ package org.apache.activemq.transport.failover; +import org.apache.activemq.broker.BrokerService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; +import java.util.Map; public class FailoverPriorityTest extends FailoverClusterTestSupport { @@ -27,6 +29,7 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport { private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61616"; private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61617"; + private static final String BROKER_C_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61618"; private HashMap urls = new HashMap(); @Override @@ -38,6 +41,7 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport { private static final String BROKER_A_NAME = "BROKERA"; private static final String BROKER_B_NAME = "BROKERB"; + private static final String BROKER_C_NAME = "BROKERC"; public void testPriorityBackup() throws Exception { @@ -86,6 +90,42 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport { restart(false, BROKER_B_NAME, BROKER_A_NAME); } + + public void testThreeBrokers() throws Exception { + // Broker A + addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME)); + addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS, false); + addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); + addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge", "static://(" + BROKER_C_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); + getBroker(BROKER_A_NAME).start(); + + // Broker B + addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME)); + addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS, false); + addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); + addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge", "static://(" + BROKER_C_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); + getBroker(BROKER_B_NAME).start(); + + // Broker C + addBroker(BROKER_C_NAME, createBroker(BROKER_C_NAME)); + addTransportConnector(getBroker(BROKER_C_NAME), "openwire", BROKER_C_CLIENT_TC_ADDRESS, false); + addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); + addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); + getBroker(BROKER_C_NAME).start(); + + + getBroker(BROKER_C_NAME).waitUntilStarted(); + Thread.sleep(1000); + + setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + "," + BROKER_C_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&initialReconnectDelay=1000&useExponentialBackOff=false&backupPoolSize=2"); + + createClients(5); + + assertAllConnectedTo(urls.get(BROKER_A_NAME)); + + restart(true, BROKER_A_NAME, BROKER_B_NAME); + + } private void restart(boolean primary, String primaryName, String secondaryName) throws Exception {