mirror of
https://github.com/apache/activemq.git
synced 2025-02-12 21:16:06 +00:00
https://issues.apache.org/jira/browse/AMQ-4461 - priority backup with multiple brokers
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1465683 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2b5d9b152a
commit
585a966a9b
@ -1169,9 +1169,11 @@ public class FailoverTransport implements CompositeTransport {
|
|||||||
t.setTransportListener(bt);
|
t.setTransportListener(bt);
|
||||||
t.start();
|
t.start();
|
||||||
bt.setTransport(t);
|
bt.setTransport(t);
|
||||||
backups.add(bt);
|
|
||||||
if (priorityBackup && isPriority(uri)) {
|
if (priorityBackup && isPriority(uri)) {
|
||||||
priorityBackupAvailable = true;
|
priorityBackupAvailable = true;
|
||||||
|
backups.add(0, bt);
|
||||||
|
} else {
|
||||||
|
backups.add(bt);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -16,10 +16,12 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.activemq.transport.failover;
|
package org.apache.activemq.transport.failover;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
public class FailoverPriorityTest extends FailoverClusterTestSupport {
|
public class FailoverPriorityTest extends FailoverClusterTestSupport {
|
||||||
|
|
||||||
@ -27,6 +29,7 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport {
|
|||||||
|
|
||||||
private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61616";
|
private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61616";
|
||||||
private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61617";
|
private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61617";
|
||||||
|
private static final String BROKER_C_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61618";
|
||||||
private HashMap<String,String> urls = new HashMap<String,String>();
|
private HashMap<String,String> urls = new HashMap<String,String>();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -38,6 +41,7 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport {
|
|||||||
|
|
||||||
private static final String BROKER_A_NAME = "BROKERA";
|
private static final String BROKER_A_NAME = "BROKERA";
|
||||||
private static final String BROKER_B_NAME = "BROKERB";
|
private static final String BROKER_B_NAME = "BROKERB";
|
||||||
|
private static final String BROKER_C_NAME = "BROKERC";
|
||||||
|
|
||||||
|
|
||||||
public void testPriorityBackup() throws Exception {
|
public void testPriorityBackup() throws Exception {
|
||||||
@ -87,6 +91,42 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testThreeBrokers() throws Exception {
|
||||||
|
// Broker A
|
||||||
|
addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
|
||||||
|
addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS, false);
|
||||||
|
addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
|
||||||
|
addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge", "static://(" + BROKER_C_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
|
||||||
|
getBroker(BROKER_A_NAME).start();
|
||||||
|
|
||||||
|
// Broker B
|
||||||
|
addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
|
||||||
|
addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS, false);
|
||||||
|
addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
|
||||||
|
addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge", "static://(" + BROKER_C_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
|
||||||
|
getBroker(BROKER_B_NAME).start();
|
||||||
|
|
||||||
|
// Broker C
|
||||||
|
addBroker(BROKER_C_NAME, createBroker(BROKER_C_NAME));
|
||||||
|
addTransportConnector(getBroker(BROKER_C_NAME), "openwire", BROKER_C_CLIENT_TC_ADDRESS, false);
|
||||||
|
addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
|
||||||
|
addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
|
||||||
|
getBroker(BROKER_C_NAME).start();
|
||||||
|
|
||||||
|
|
||||||
|
getBroker(BROKER_C_NAME).waitUntilStarted();
|
||||||
|
Thread.sleep(1000);
|
||||||
|
|
||||||
|
setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + "," + BROKER_C_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&initialReconnectDelay=1000&useExponentialBackOff=false&backupPoolSize=2");
|
||||||
|
|
||||||
|
createClients(5);
|
||||||
|
|
||||||
|
assertAllConnectedTo(urls.get(BROKER_A_NAME));
|
||||||
|
|
||||||
|
restart(true, BROKER_A_NAME, BROKER_B_NAME);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
private void restart(boolean primary, String primaryName, String secondaryName) throws Exception {
|
private void restart(boolean primary, String primaryName, String secondaryName) throws Exception {
|
||||||
|
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user