git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1241061 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2012-02-06 16:25:44 +00:00
parent e73230b4a3
commit 7388438a85
3 changed files with 260 additions and 25 deletions

View File

@ -115,6 +115,11 @@ public class FailoverTransport implements CompositeTransport {
private String updateURIsURL = null;
private boolean rebalanceUpdateURIs = true;
private boolean doRebalance = false;
private boolean connectedToPriority = false;
private boolean priorityBackup = false;
private ArrayList<URI> priorityList = new ArrayList<URI>();
private boolean priorityBackupAvailable = false;
public FailoverTransport() throws InterruptedIOException {
brokerSslContext = SslContext.getCurrentSslContext();
@ -128,13 +133,22 @@ public class FailoverTransport implements CompositeTransport {
}
boolean buildBackup = true;
synchronized (backupMutex) {
if ((connectedTransport.get() == null || doRebalance) && !disposed) {
if ((connectedTransport.get() == null || doRebalance || priorityBackupAvailable) && !disposed) {
result = doReconnect();
buildBackup = false;
connectedToPriority = isPriority(connectedTransportURI);
}
}
if (buildBackup) {
buildBackups();
if (priorityBackup && !connectedToPriority) {
try {
doDelay();
reconnectTask.wakeup();
} catch (InterruptedException e) {
LOG.debug("Reconnect task has been interrupted.", e);
}
}
} else {
// build backups on the next iteration
buildBackup = true;
@ -471,6 +485,27 @@ public class FailoverTransport implements CompositeTransport {
this.maxCacheSize = maxCacheSize;
}
public boolean isPriorityBackup() {
return priorityBackup;
}
public void setPriorityBackup(boolean priorityBackup) {
this.priorityBackup = priorityBackup;
}
public void setPriorityURIs(String priorityURIs) {
StringTokenizer tokenizer = new StringTokenizer(priorityURIs, ",");
while (tokenizer.hasMoreTokens()) {
String str = tokenizer.nextToken();
try {
URI uri = new URI(str);
priorityList.add(uri);
} catch (Exception e) {
LOG.error("Failed to parse broker address: " + str, e);
}
}
}
public void oneway(Object o) throws IOException {
Command command = (Command) o;
@ -807,7 +842,7 @@ public class FailoverTransport implements CompositeTransport {
if (disposed || connectionFailure != null) {
reconnectMutex.notifyAll();
}
if ((connectedTransport.get() != null && !doRebalance) || disposed || connectionFailure != null) {
if ((connectedTransport.get() != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null) {
return false;
} else {
List<URI> connectList = getConnectList();
@ -844,7 +879,7 @@ public class FailoverTransport implements CompositeTransport {
// If we have a backup already waiting lets try it.
synchronized (backupMutex) {
if (backup && !backups.isEmpty()) {
if ((priorityBackup || backup) && !backups.isEmpty()) {
ArrayList<BackupTransport> l = new ArrayList(backups);
if (randomize) {
Collections.shuffle(l);
@ -853,6 +888,13 @@ public class FailoverTransport implements CompositeTransport {
backups.remove(bt);
transport = bt.getTransport();
uri = bt.getUri();
if (priorityBackup && priorityBackupAvailable) {
Transport old = this.connectedTransport.getAndSet(null);
if (transport != null) {
disposeTransport(old);
}
priorityBackupAvailable = false;
}
}
}
@ -979,30 +1021,33 @@ public class FailoverTransport implements CompositeTransport {
}
if (!disposed) {
doDelay();
}
if (reconnectDelay > 0) {
synchronized (sleepMutex) {
if (LOG.isDebugEnabled()) {
LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection");
}
try {
sleepMutex.wait(reconnectDelay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return !disposed;
}
private void doDelay() {
if (reconnectDelay > 0) {
synchronized (sleepMutex) {
if (LOG.isDebugEnabled()) {
LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection");
}
}
if (useExponentialBackOff) {
// Exponential increment of reconnect delay.
reconnectDelay *= backOffMultiplier;
if (reconnectDelay > maxReconnectDelay) {
reconnectDelay = maxReconnectDelay;
try {
sleepMutex.wait(reconnectDelay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
return !disposed;
if (useExponentialBackOff) {
// Exponential increment of reconnect delay.
reconnectDelay *= backOffMultiplier;
if (reconnectDelay > maxReconnectDelay) {
reconnectDelay = maxReconnectDelay;
}
}
}
private void resetReconnectDelay() {
@ -1035,8 +1080,14 @@ public class FailoverTransport implements CompositeTransport {
final boolean buildBackups() {
synchronized (backupMutex) {
if (!disposed && backup && backups.size() < backupPoolSize) {
if (!disposed && (backup || priorityBackup) && backups.size() < backupPoolSize) {
ArrayList<URI> backupList = new ArrayList<URI>(priorityList);
List<URI> connectList = getConnectList();
for (URI uri: connectList) {
if (!backupList.contains(uri)) {
backupList.add(uri);
}
}
// removed disposed backups
List<BackupTransport> disposedList = new ArrayList<BackupTransport>();
for (BackupTransport bt : backups) {
@ -1046,7 +1097,7 @@ public class FailoverTransport implements CompositeTransport {
}
backups.removeAll(disposedList);
disposedList.clear();
for (Iterator<URI> iter = connectList.iterator(); iter.hasNext() && backups.size() < backupPoolSize; ) {
for (Iterator<URI> iter = backupList.iterator(); iter.hasNext() && backups.size() < backupPoolSize; ) {
URI uri = iter.next();
if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
try {
@ -1059,6 +1110,9 @@ public class FailoverTransport implements CompositeTransport {
t.start();
bt.setTransport(t);
backups.add(bt);
if (priorityBackup && isPriority(uri)) {
priorityBackupAvailable = true;
}
}
} catch (Exception e) {
LOG.debug("Failed to build backup ", e);
@ -1071,6 +1125,13 @@ public class FailoverTransport implements CompositeTransport {
}
return false;
}
protected boolean isPriority(URI uri) {
if (!priorityList.isEmpty()) {
return priorityList.contains(uri);
}
return uris.indexOf(uri) == 0;
}
public boolean isDisposed() {
return disposed;

View File

@ -65,6 +65,12 @@ public class FailoverClusterTestSupport extends TestCase {
set.size() == 3);
}
protected void assertAllConnectedTo(String url) throws Exception {
for (ActiveMQConnection c : connections) {
assertEquals(c.getTransportChannel().getRemoteAddress(), url);
}
}
protected void addBroker(String name, BrokerService brokerService) {
brokers.put(name, brokerService);
}
@ -72,6 +78,12 @@ public class FailoverClusterTestSupport extends TestCase {
protected BrokerService getBroker(String name) {
return brokers.get(name);
}
protected void stopBroker(String name) throws Exception {
BrokerService broker = brokers.remove(name);
broker.stop();
broker.waitUntilStopped();
}
protected BrokerService removeBroker(String name) {
return brokers.remove(name);
@ -126,11 +138,14 @@ public class FailoverClusterTestSupport extends TestCase {
}
}
@SuppressWarnings("unused")
protected void createClients() throws Exception {
createClients(NUMBER_OF_CLIENTS);
}
protected void createClients(int num) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
clientUrl);
for (int i = 0; i < NUMBER_OF_CLIENTS; i++) {
for (int i = 0; i < num; i++) {
ActiveMQConnection c = (ActiveMQConnection) factory
.createConnection();
c.start();

View File

@ -0,0 +1,159 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.failover;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
public class FailoverPriorityTest extends FailoverClusterTestSupport {
protected final Logger LOG = LoggerFactory.getLogger(getClass());
private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61616";
private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61617";
private HashMap<String,String> urls = new HashMap<String,String>();
@Override
public void setUp() throws Exception {
super.setUp();
urls.put(BROKER_A_NAME, BROKER_A_CLIENT_TC_ADDRESS);
urls.put(BROKER_B_NAME, BROKER_B_CLIENT_TC_ADDRESS);
}
private static final String BROKER_A_NAME = "BROKERA";
private static final String BROKER_B_NAME = "BROKERB";
public void testPriorityBackup() throws Exception {
createBrokerA();
createBrokerB();
getBroker(BROKER_B_NAME).waitUntilStarted();
Thread.sleep(1000);
setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&initialReconnectDelay=1000&useExponentialBackOff=false");
createClients(5);
assertAllConnectedTo(urls.get(BROKER_A_NAME));
restart(false, BROKER_A_NAME, BROKER_B_NAME);
for (int i = 0; i < 3; i++) {
restart(true, BROKER_A_NAME, BROKER_B_NAME);
}
Thread.sleep(5000);
restart(false, BROKER_A_NAME, BROKER_B_NAME);
}
public void testPriorityBackupList() throws Exception {
createBrokerA();
createBrokerB();
getBroker(BROKER_B_NAME).waitUntilStarted();
Thread.sleep(1000);
setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&priorityURIs=tcp://127.0.0.1:61617&initialReconnectDelay=1000&useExponentialBackOff=false");
createClients(5);
Thread.sleep(3000);
assertAllConnectedTo(urls.get(BROKER_B_NAME));
restart(false, BROKER_B_NAME, BROKER_A_NAME);
for (int i = 0; i < 3; i++) {
restart(true, BROKER_B_NAME, BROKER_A_NAME);
}
restart(false, BROKER_B_NAME, BROKER_A_NAME);
}
private void restart(boolean primary, String primaryName, String secondaryName) throws Exception {
Thread.sleep(1000);
if (primary) {
LOG.info("Stopping " + primaryName);
stopBroker(primaryName);
} else {
LOG.info("Stopping " + secondaryName);
stopBroker(secondaryName);
}
Thread.sleep(5000);
if (primary) {
assertAllConnectedTo(urls.get(secondaryName));
} else {
assertAllConnectedTo(urls.get(primaryName));
}
if (primary) {
LOG.info("Starting " + primaryName);
createBrokerByName(primaryName);
getBroker(primaryName).waitUntilStarted();
} else {
LOG.info("Starting " + secondaryName);
createBrokerByName(secondaryName);
getBroker(secondaryName).waitUntilStarted();
}
Thread.sleep(5000);
assertAllConnectedTo(urls.get(primaryName));
}
private void createBrokerByName(String name) throws Exception {
if (name.equals(BROKER_A_NAME)) {
createBrokerA();
} else if (name.equals(BROKER_B_NAME)) {
createBrokerB();
} else {
throw new Exception("Unknown broker " + name);
}
}
private void createBrokerA() throws Exception {
if (getBroker(BROKER_A_NAME) == null) {
addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS, false);
addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
getBroker(BROKER_A_NAME).start();
}
}
private void createBrokerB() throws Exception {
if (getBroker(BROKER_B_NAME) == null) {
addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS, false);
addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
getBroker(BROKER_B_NAME).start();
}
}
@Override
protected void tearDown() throws Exception {
shutdownClients();
destroyBrokerCluster();
}
}