git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1239118 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2012-02-01 13:12:33 +00:00
parent bc78238ad0
commit 91059de283
8 changed files with 428 additions and 71 deletions

View File

@ -1367,11 +1367,20 @@ public class BrokerService implements Service {
LOG.warn("Failed to get the ConnectURI for "+tc,e);
}
if (result != null) {
// find first publishable uri
if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) {
this.defaultSocketURIString = result;
break;
} else {
// or use the first defined
if (this.defaultSocketURIString == null) {
this.defaultSocketURIString = result;
}
}
}
}
}
return this.defaultSocketURIString;
}
return null;

View File

@ -16,26 +16,6 @@
*/
package org.apache.activemq.broker;
import java.io.EOFException;
import java.io.IOException;
import java.net.SocketException;
import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.transaction.xa.XAResource;
import org.apache.activemq.broker.ft.MasterBroker;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.broker.region.RegionBroker;
@ -69,6 +49,26 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import javax.transaction.xa.XAResource;
import java.io.EOFException;
import java.io.IOException;
import java.net.SocketException;
import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class TransportConnection implements Connection, Task, CommandVisitor {
private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class);
private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport");

View File

@ -209,7 +209,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
brokerInfo.setBrokerId(broker.getBrokerId());
brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
brokerInfo.setBrokerURL(getPublishableConnectString(getServer().getConnectURI()));
brokerInfo.setBrokerURL(broker.getBrokerService().getDefaultSocketURIString());
getServer().setAcceptListener(new TransportAcceptListener() {
public void onAccept(final Transport transport) {
try {
@ -402,28 +402,29 @@ public class TransportConnector implements Connector, BrokerServiceAware {
boolean rebalance = isRebalanceClusterClients();
String connectedBrokers = "";
String self = "";
String separator = "";
if (isUpdateClusterClients()) {
if (brokerService.getDefaultSocketURIString() != null) {
self += brokerService.getDefaultSocketURIString();
self += ",";
}
if (rebalance == false) {
connectedBrokers += self;
separator = ",";
}
if (this.broker.getPeerBrokerInfos() != null) {
for (BrokerInfo info : this.broker.getPeerBrokerInfos()) {
if (isMatchesClusterFilter(info.getBrokerName())) {
connectedBrokers += separator;
connectedBrokers += info.getBrokerURL();
connectedBrokers += ",";
separator = ",";
}
}
}
if (rebalance) {
connectedBrokers += self;
connectedBrokers += separator + self;
}
}
ConnectionControl control = new ConnectionControl();
control.setConnectedBrokers(connectedBrokers);
control.setRebalanceConnection(rebalance);

View File

@ -18,11 +18,12 @@
package org.apache.activemq.transport.failover;
import java.io.IOException;
import java.net.URI;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.Transport;
import java.io.IOException;
import java.net.URI;
class BackupTransport extends DefaultTransportListener{
private final FailoverTransport failoverTransport;
private Transport transport;
@ -76,4 +77,9 @@ class BackupTransport extends DefaultTransportListener{
}
return false;
}
@Override
public String toString() {
return "Backup transport: " + uri;
}
}

View File

@ -16,26 +16,6 @@
*/
package org.apache.activemq.transport.failover;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.InterruptedIOException;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
@ -59,6 +39,25 @@ import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.InterruptedIOException;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
/**
* A Transport that is made reliable by being able to fail over to another
* transport when a transport failure is detected.
@ -241,6 +240,7 @@ public class FailoverTransport implements CompositeTransport {
}
if (reconnectOk) {
updated.remove(failedConnectTransportURI);
reconnectTask.wakeup();
} else {
propagateFailureToExceptionListener(e);
@ -670,6 +670,7 @@ public class FailoverTransport implements CompositeTransport {
private List<URI> getConnectList() {
ArrayList<URI> l = new ArrayList<URI>(uris);
l.addAll(updated);
boolean removed = false;
if (failedConnectTransportURI != null) {
removed = l.remove(failedConnectTransportURI);
@ -806,7 +807,6 @@ public class FailoverTransport implements CompositeTransport {
if (disposed || connectionFailure != null) {
reconnectMutex.notifyAll();
}
if ((connectedTransport.get() != null && !doRebalance) || disposed || connectionFailure != null) {
return false;
} else {
@ -845,7 +845,12 @@ public class FailoverTransport implements CompositeTransport {
// If we have a backup already waiting lets try it.
synchronized (backupMutex) {
if (backup && !backups.isEmpty()) {
BackupTransport bt = backups.remove(0);
ArrayList<BackupTransport> l = new ArrayList(backups);
if (randomize) {
Collections.shuffle(l);
}
BackupTransport bt = l.remove(0);
backups.remove(bt);
transport = bt.getTransport();
uri = bt.getUri();
}
@ -1098,26 +1103,18 @@ public class FailoverTransport implements CompositeTransport {
public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
if (isUpdateURIsSupported()) {
List<URI> copy = new ArrayList<URI>(this.updated);
List<URI> add = new ArrayList<URI>();
updated.clear();
if (updatedURIs != null && updatedURIs.length > 0) {
Set<URI> set = new HashSet<URI>();
for (URI uri : updatedURIs) {
if (uri != null) {
set.add(uri);
}
}
for (URI uri : set) {
if (copy.remove(uri) == false) {
add.add(uri);
if (uri != null && !uris.contains(uri)) {
updated.add(uri);
}
}
synchronized (reconnectMutex) {
this.updated.clear();
this.updated.addAll(add);
for (URI uri : copy) {
this.uris.remove(uri);
if (!(copy.isEmpty() && updated.isEmpty()) && !copy.equals(updated)) {
buildBackups();
reconnect(rebalance);
}
add(rebalance, add.toArray(new URI[add.size()]));
}
}
}

View File

@ -0,0 +1,151 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.failover;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.network.NetworkConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class FailoverClusterTestSupport extends TestCase {
protected final Logger logger = LoggerFactory.getLogger(getClass());
private static final int NUMBER_OF_CLIENTS = 30;
private String clientUrl;
private final Map<String, BrokerService> brokers = new HashMap<String, BrokerService>();
private final List<ActiveMQConnection> connections = new ArrayList<ActiveMQConnection>();
protected void assertClientsConnectedToTwoBrokers() {
Set<String> set = new HashSet<String>();
for (ActiveMQConnection c : connections) {
set.add(c.getTransportChannel().getRemoteAddress());
}
assertTrue("Only 2 connections should be found: " + set,
set.size() == 2);
}
protected void assertClientsConnectedToThreeBrokers() {
Set<String> set = new HashSet<String>();
for (ActiveMQConnection c : connections) {
set.add(c.getTransportChannel().getRemoteAddress());
}
assertTrue("Only 3 connections should be found: " + set,
set.size() == 3);
}
protected void addBroker(String name, BrokerService brokerService) {
brokers.put(name, brokerService);
}
protected BrokerService getBroker(String name) {
return brokers.get(name);
}
protected BrokerService removeBroker(String name) {
return brokers.remove(name);
}
protected void destroyBrokerCluster() throws JMSException, Exception {
for (BrokerService b : brokers.values()) {
b.stop();
}
brokers.clear();
}
protected void shutdownClients() throws JMSException {
for (Connection c : connections) {
c.close();
}
}
protected BrokerService createBroker(String brokerName) throws Exception {
BrokerService answer = new BrokerService();
answer.setPersistent(false);
answer.setUseJmx(false);
answer.setBrokerName(brokerName);
answer.setUseShutdownHook(false);
return answer;
}
protected void addTransportConnector(BrokerService brokerService,
String connectorName, String uri, boolean clustered)
throws Exception {
TransportConnector connector = brokerService.addConnector(uri);
connector.setName(connectorName);
if (clustered) {
connector.setRebalanceClusterClients(true);
connector.setUpdateClusterClients(true);
connector.setUpdateClusterClientsOnRemove(true);
} else {
connector.setRebalanceClusterClients(false);
connector.setUpdateClusterClients(false);
connector.setUpdateClusterClientsOnRemove(false);
}
}
protected void addNetworkBridge(BrokerService answer, String bridgeName,
String uri, boolean duplex, String destinationFilter)
throws Exception {
NetworkConnector network = answer.addNetworkConnector(uri);
network.setName(bridgeName);
network.setDuplex(duplex);
if (destinationFilter != null && !destinationFilter.equals("")) {
network.setDestinationFilter(bridgeName);
}
}
@SuppressWarnings("unused")
protected void createClients() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
clientUrl);
for (int i = 0; i < NUMBER_OF_CLIENTS; i++) {
ActiveMQConnection c = (ActiveMQConnection) factory
.createConnection();
c.start();
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = s.createQueue(getClass().getName());
MessageConsumer consumer = s.createConsumer(queue);
connections.add(c);
}
}
public String getClientUrl() {
return clientUrl;
}
public void setClientUrl(String clientUrl) {
this.clientUrl = clientUrl;
}
}

View File

@ -0,0 +1,193 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.failover;
/**
* Complex cluster test that will exercise the dynamic failover capabilities of
* a network of brokers. Using a networking of 3 brokers where the 3rd broker is
* removed and then added back in it is expected in each test that the number of
* connections on the client should start with 3, then have two after the 3rd
* broker is removed and then show 3 after the 3rd broker is reintroduced.
*/
public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://localhost:61616";
private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://localhost:61617";
private static final String BROKER_C_CLIENT_TC_ADDRESS = "tcp://localhost:61618";
private static final String BROKER_A_NOB_TC_ADDRESS = "tcp://localhost:61626";
private static final String BROKER_B_NOB_TC_ADDRESS = "tcp://localhost:61627";
private static final String BROKER_C_NOB_TC_ADDRESS = "tcp://localhost:61628";
private static final String BROKER_A_NAME = "BROKERA";
private static final String BROKER_B_NAME = "BROKERB";
private static final String BROKER_C_NAME = "BROKERC";
public void testThreeBrokerClusterSingleConnectorBasic() throws Exception {
initSingleTcBroker("", null);
Thread.sleep(2000);
setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")");
createClients();
Thread.sleep(2000);
runTests(false);
}
public void testThreeBrokerClusterSingleConnectorBackup() throws Exception {
initSingleTcBroker("", null);
Thread.sleep(2000);
setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?backup=true&backupPoolSize=2");
createClients();
Thread.sleep(2000);
runTests(false);
}
public void testThreeBrokerClusterSingleConnectorWithParams() throws Exception {
initSingleTcBroker("?transport.closeAsync=false", null);
Thread.sleep(2000);
setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")");
createClients();
runTests(false);
}
public void testThreeBrokerClusterMultipleConnectorBasic() throws Exception {
initMultiTcCluster("", null);
Thread.sleep(2000);
setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")");
createClients();
runTests(true);
}
/**
* Runs a 3 tests: <br/>
* <ul>
* <li>asserts clients are distributed across all 3 brokers</li>
* <li>asserts clients are distributed across 2 brokers after removing the 3rd</li>
* <li>asserts clients are distributed across all 3 brokers after reintroducing the 3rd broker</li>
* </ul>
* @throws Exception
* @throws InterruptedException
*/
private void runTests(boolean multi) throws Exception, InterruptedException {
assertClientsConnectedToThreeBrokers();
getBroker(BROKER_C_NAME).stop();
getBroker(BROKER_C_NAME).waitUntilStopped();
removeBroker(BROKER_C_NAME);
Thread.sleep(5000);
assertClientsConnectedToTwoBrokers();
createBrokerC(multi, "", null);
getBroker(BROKER_C_NAME).waitUntilStarted();
Thread.sleep(5000);
assertClientsConnectedToThreeBrokers();
}
@Override
protected void setUp() throws Exception {
}
@Override
protected void tearDown() throws Exception {
shutdownClients();
destroyBrokerCluster();
Thread.sleep(2000);
}
private void initSingleTcBroker(String params, String clusterFilter) throws Exception {
createBrokerA(false, params, clusterFilter);
createBrokerB(false, params, clusterFilter);
createBrokerC(false, params, clusterFilter);
getBroker(BROKER_C_NAME).waitUntilStarted();
}
private void initMultiTcCluster(String params, String clusterFilter) throws Exception {
createBrokerA(true, params, clusterFilter);
createBrokerB(true, params, clusterFilter);
createBrokerC(true, params, clusterFilter);
getBroker(BROKER_C_NAME).waitUntilStarted();
}
private void createBrokerA(boolean multi, String params, String clusterFilter) throws Exception {
if (getBroker(BROKER_A_NAME) == null) {
addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS + params, true);
if (multi) {
addTransportConnector(getBroker(BROKER_A_NAME), "network", BROKER_A_NOB_TC_ADDRESS, false);
addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge", "static://(" + BROKER_C_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
} else {
addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge", "static://(" + BROKER_C_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
}
getBroker(BROKER_A_NAME).start();
}
}
private void createBrokerB(boolean multi, String params, String clusterFilter) throws Exception {
if (getBroker(BROKER_B_NAME) == null) {
addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS + params, true);
if (multi) {
addTransportConnector(getBroker(BROKER_B_NAME), "network", BROKER_B_NOB_TC_ADDRESS, false);
addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge", "static://(" + BROKER_C_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
} else {
addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge", "static://(" + BROKER_C_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
}
getBroker(BROKER_B_NAME).start();
}
}
private void createBrokerC(boolean multi, String params, String clusterFilter) throws Exception {
if (getBroker(BROKER_C_NAME) == null) {
addBroker(BROKER_C_NAME, createBroker(BROKER_C_NAME));
addTransportConnector(getBroker(BROKER_C_NAME), "openwire", BROKER_C_CLIENT_TC_ADDRESS + params, true);
if (multi) {
addTransportConnector(getBroker(BROKER_C_NAME), "network", BROKER_C_NOB_TC_ADDRESS, false);
addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge", "static://(" + BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge", "static://(" + BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
} else {
addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
}
getBroker(BROKER_C_NAME).start();
}
}
}

View File

@ -16,9 +16,6 @@
*/
package org.apache.activemq.transport.failover;
import java.io.IOException;
import java.net.URI;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.MessageAck;
@ -32,6 +29,10 @@ import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.URI;
import static org.junit.Assert.*;
public class FailoverTransportTest {
@ -99,7 +100,6 @@ public class FailoverTransportTest {
// Track a connection
tracker.track(connection);
try {
this.transport.oneway(new RemoveInfo(new ConnectionId("1")));
} catch(Exception e) {
@ -128,7 +128,7 @@ public class FailoverTransportTest {
protected Transport createTransport() throws Exception {
Transport transport = TransportFactory.connect(
new URI("failover://(tcp://doesNotExist:1234)"));
new URI("failover://(tcp://localhost:1234)"));
transport.setTransportListener(new TransportListener() {
public void onCommand(Object command) {