git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1177088 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-09-28 22:07:21 +00:00
parent d9124246b7
commit b6b69113b1
4 changed files with 229 additions and 101 deletions

View File

@ -14,11 +14,9 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.transport.failover; package org.apache.activemq.transport.failover;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader; import java.io.FileReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
@ -37,6 +35,7 @@ import java.util.Set;
import java.util.StringTokenizer; import java.util.StringTokenizer;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.broker.SslContext; import org.apache.activemq.broker.SslContext;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl; import org.apache.activemq.command.ConnectionControl;
@ -60,12 +59,9 @@ import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* A Transport that is made reliable by being able to fail over to another * A Transport that is made reliable by being able to fail over to another
* transport when a transport failure is detected. * transport when a transport failure is detected.
*
*
*/ */
public class FailoverTransport implements CompositeTransport { public class FailoverTransport implements CompositeTransport {
@ -114,12 +110,12 @@ public class FailoverTransport implements CompositeTransport {
//private boolean connectionInterruptProcessingComplete; //private boolean connectionInterruptProcessingComplete;
private final TransportListener myTransportListener = createTransportListener(); private final TransportListener myTransportListener = createTransportListener();
private boolean updateURIsSupported=true; private boolean updateURIsSupported = true;
private boolean reconnectSupported=true; private boolean reconnectSupported = true;
// remember for reconnect thread // remember for reconnect thread
private SslContext brokerSslContext; private SslContext brokerSslContext;
private String updateURIsURL = null; private String updateURIsURL = null;
private boolean rebalanceUpdateURIs=true; private boolean rebalanceUpdateURIs = true;
private boolean doRebalance = false; private boolean doRebalance = false;
public FailoverTransport() throws InterruptedIOException { public FailoverTransport() throws InterruptedIOException {
@ -130,7 +126,6 @@ public class FailoverTransport implements CompositeTransport {
public boolean iterate() { public boolean iterate() {
boolean result = false; boolean result = false;
boolean buildBackup = true; boolean buildBackup = true;
boolean doReconnect = !disposed;
synchronized (backupMutex) { synchronized (backupMutex) {
if ((connectedTransport.get() == null || doRebalance) && !disposed) { if ((connectedTransport.get() == null || doRebalance) && !disposed) {
result = doReconnect(); result = doReconnect();
@ -174,7 +169,7 @@ public class FailoverTransport implements CompositeTransport {
initialized = true; initialized = true;
} }
if(command.isConnectionControl()) { if (command.isConnectionControl()) {
handleConnectionControl((ConnectionControl) command); handleConnectionControl((ConnectionControl) command);
} }
if (transportListener != null) { if (transportListener != null) {
@ -238,8 +233,7 @@ public class FailoverTransport implements CompositeTransport {
connectedTransportURI = null; connectedTransportURI = null;
connected = false; connected = false;
// notify before any reconnect attempt so ack state can be // notify before any reconnect attempt so ack state can be whacked
// whacked
if (transportListener != null) { if (transportListener != null) {
transportListener.transportInterupted(); transportListener.transportInterupted();
} }
@ -292,7 +286,6 @@ public class FailoverTransport implements CompositeTransport {
LOG.error("Failed to update transport URI's from: " + newTransports, e); LOG.error("Failed to update transport URI's from: " + newTransports, e);
} }
} }
} }
} }
} }
@ -416,8 +409,7 @@ public class FailoverTransport implements CompositeTransport {
} }
/** /**
* @param randomize * @param randomize The randomize to set.
* The randomize to set.
*/ */
public void setRandomize(boolean randomize) { public void setRandomize(boolean randomize) {
this.randomize = randomize; this.randomize = randomize;
@ -571,7 +563,6 @@ public class FailoverTransport implements CompositeTransport {
// the outer catch // the outer catch
throw e; throw e;
} }
} }
return; return;
@ -613,9 +604,9 @@ public class FailoverTransport implements CompositeTransport {
public void add(boolean rebalance, URI u[]) { public void add(boolean rebalance, URI u[]) {
boolean newURI = false; boolean newURI = false;
for (int i = 0; i < u.length; i++) { for (URI uri : u) {
if (!contains(u[i])) { if (!contains(uri)) {
uris.add(u[i]); uris.add(uri);
newURI = true; newURI = true;
} }
} }
@ -625,8 +616,8 @@ public class FailoverTransport implements CompositeTransport {
} }
public void remove(boolean rebalance, URI u[]) { public void remove(boolean rebalance, URI u[]) {
for (int i = 0; i < u.length; i++) { for (URI uri : u) {
uris.remove(u[i]); uris.remove(uri);
} }
// rebalance is automatic if any connected to removed/stopped broker // rebalance is automatic if any connected to removed/stopped broker
} }
@ -634,7 +625,7 @@ public class FailoverTransport implements CompositeTransport {
public void add(boolean rebalance, String u) { public void add(boolean rebalance, String u) {
try { try {
URI newURI = new URI(u); URI newURI = new URI(u);
if (contains(newURI)==false) { if (contains(newURI) == false) {
uris.add(newURI); uris.add(newURI);
reconnect(rebalance); reconnect(rebalance);
} }
@ -680,7 +671,9 @@ public class FailoverTransport implements CompositeTransport {
if (removed) { if (removed) {
l.add(failedConnectTransportURI); l.add(failedConnectTransportURI);
} }
if (LOG.isDebugEnabled()) {
LOG.debug("urlList connectionList:" + l + ", from: " + uris); LOG.debug("urlList connectionList:" + l + ", from: " + uris);
}
return l; return l;
} }
@ -715,12 +708,11 @@ public class FailoverTransport implements CompositeTransport {
cc.setFaultTolerant(true); cc.setFaultTolerant(true);
t.oneway(cc); t.oneway(cc);
stateTracker.restore(t); stateTracker.restore(t);
Map tmpMap = null; Map<Integer, Command> tmpMap = null;
synchronized (requestMap) { synchronized (requestMap) {
tmpMap = new LinkedHashMap<Integer, Command>(requestMap); tmpMap = new LinkedHashMap<Integer, Command>(requestMap);
} }
for (Iterator<Command> iter2 = tmpMap.values().iterator(); iter2.hasNext();) { for (Command command : tmpMap.values()) {
Command command = iter2.next();
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("restore requestMap, replay: " + command); LOG.trace("restore requestMap, replay: " + command);
} }
@ -753,9 +745,7 @@ public class FailoverTransport implements CompositeTransport {
return true; return true;
} }
final boolean doReconnect() { private void doUpdateURIsFromDisk() {
Exception failure = null;
synchronized (reconnectMutex) {
// If updateURIsURL is specified, read the file and add any new // If updateURIsURL is specified, read the file and add any new
// transport URI's to this FailOverTransport. // transport URI's to this FailOverTransport.
@ -790,6 +780,14 @@ public class FailoverTransport implements CompositeTransport {
processNewTransports(isRebalanceUpdateURIs(), newUris); processNewTransports(isRebalanceUpdateURIs(), newUris);
} }
}
final boolean doReconnect() {
Exception failure = null;
synchronized (reconnectMutex) {
// First ensure we are up to date.
doUpdateURIsFromDisk();
if (disposed || connectionFailure != null) { if (disposed || connectionFailure != null) {
reconnectMutex.notifyAll(); reconnectMutex.notifyAll();
@ -808,16 +806,20 @@ public class FailoverTransport implements CompositeTransport {
doRebalance = false; doRebalance = false;
return false; return false;
} else { } else {
if (LOG.isDebugEnabled()) {
LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList); LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList);
}
try { try {
Transport transport = this.connectedTransport.getAndSet(null); Transport transport = this.connectedTransport.getAndSet(null);
if (transport != null) { if (transport != null) {
disposeTransport(transport); disposeTransport(transport);
} }
} catch (Exception e) { } catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Caught an exception stopping existing transport for rebalance", e); LOG.debug("Caught an exception stopping existing transport for rebalance", e);
} }
} }
}
doRebalance = false; doRebalance = false;
} }
if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) { if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) {
@ -847,12 +849,27 @@ public class FailoverTransport implements CompositeTransport {
} }
} }
// Sleep for the reconnectDelay
if (!firstConnection && (reconnectDelay > 0) && !disposed) {
synchronized (sleepMutex) {
LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
try {
sleepMutex.wait(reconnectDelay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Iterator<URI> iter = connectList.iterator(); Iterator<URI> iter = connectList.iterator();
while (iter.hasNext() && connectedTransport.get() == null && !disposed) { while (iter.hasNext() && connectedTransport.get() == null && !disposed) {
URI uri = iter.next(); URI uri = iter.next();
Transport t = null; Transport t = null;
try { try {
if (LOG.isDebugEnabled()) {
LOG.debug("Attempting connect to: " + uri); LOG.debug("Attempting connect to: " + uri);
}
SslContext.setCurrentSslContext(brokerSslContext); SslContext.setCurrentSslContext(brokerSslContext);
t = TransportFactory.compositeConnect(uri); t = TransportFactory.compositeConnect(uri);
t.setTransportListener(myTransportListener); t.setTransportListener(myTransportListener);
@ -924,8 +941,7 @@ public class FailoverTransport implements CompositeTransport {
connectionFailure = failure; connectionFailure = failure;
// Make sure on initial startup, that the transportListener has // Make sure on initial startup, that the transportListener has
// been initialized // been initialized for this instance.
// for this instance.
synchronized (listenerMutex) { synchronized (listenerMutex) {
if (transportListener == null) { if (transportListener == null) {
try { try {
@ -946,16 +962,19 @@ public class FailoverTransport implements CompositeTransport {
return false; return false;
} }
} }
if (!disposed) { if (!disposed) {
LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. "); if (reconnectDelay > 0) {
synchronized (sleepMutex) { synchronized (sleepMutex) {
LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
try { try {
sleepMutex.wait(reconnectDelay); sleepMutex.wait(reconnectDelay);
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} }
}
if (useExponentialBackOff) { if (useExponentialBackOff) {
// Exponential increment of reconnect delay. // Exponential increment of reconnect delay.
@ -965,6 +984,7 @@ public class FailoverTransport implements CompositeTransport {
} }
} }
} }
return !disposed; return !disposed;
} }
@ -981,7 +1001,7 @@ public class FailoverTransport implements CompositeTransport {
} }
backups.removeAll(disposedList); backups.removeAll(disposedList);
disposedList.clear(); disposedList.clear();
for (Iterator<URI> iter = connectList.iterator(); iter.hasNext() && backups.size() < backupPoolSize;) { for (Iterator<URI> iter = connectList.iterator(); iter.hasNext() && backups.size() < backupPoolSize; ) {
URI uri = iter.next(); URI uri = iter.next();
if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) { if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
try { try {
@ -1016,7 +1036,7 @@ public class FailoverTransport implements CompositeTransport {
} }
public void reconnect(URI uri) throws IOException { public void reconnect(URI uri) throws IOException {
add(true, new URI[] { uri }); add(true, new URI[]{uri});
} }
public boolean isReconnectSupported() { public boolean isReconnectSupported() {
@ -1024,7 +1044,7 @@ public class FailoverTransport implements CompositeTransport {
} }
public void setReconnectSupported(boolean value) { public void setReconnectSupported(boolean value) {
this.reconnectSupported=value; this.reconnectSupported = value;
} }
public boolean isUpdateURIsSupported() { public boolean isUpdateURIsSupported() {
@ -1032,7 +1052,7 @@ public class FailoverTransport implements CompositeTransport {
} }
public void setUpdateURIsSupported(boolean value) { public void setUpdateURIsSupported(boolean value) {
this.updateURIsSupported=value; this.updateURIsSupported = value;
} }
public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException { public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
@ -1041,8 +1061,7 @@ public class FailoverTransport implements CompositeTransport {
List<URI> add = new ArrayList<URI>(); List<URI> add = new ArrayList<URI>();
if (updatedURIs != null && updatedURIs.length > 0) { if (updatedURIs != null && updatedURIs.length > 0) {
Set<URI> set = new HashSet<URI>(); Set<URI> set = new HashSet<URI>();
for (int i = 0; i < updatedURIs.length; i++) { for (URI uri : updatedURIs) {
URI uri = updatedURIs[i];
if (uri != null) { if (uri != null) {
set.add(uri); set.add(uri);
} }
@ -1114,8 +1133,8 @@ public class FailoverTransport implements CompositeTransport {
boolean result = false; boolean result = false;
try { try {
for (URI uri:uris) { for (URI uri : uris) {
if (newURI.getPort()==uri.getPort()) { if (newURI.getPort() == uri.getPort()) {
InetAddress newAddr = InetAddress.getByName(newURI.getHost()); InetAddress newAddr = InetAddress.getByName(newURI.getHost());
InetAddress addr = InetAddress.getByName(uri.getHost()); InetAddress addr = InetAddress.getByName(uri.getHost());
if (addr.equals(newAddr)) { if (addr.equals(newAddr)) {
@ -1124,7 +1143,7 @@ public class FailoverTransport implements CompositeTransport {
} }
} }
} }
}catch(IOException e) { } catch (IOException e) {
result = true; result = true;
LOG.error("Failed to verify URI " + newURI + " already known: " + e); LOG.error("Failed to verify URI " + newURI + " already known: " + e);
} }

View File

@ -58,7 +58,7 @@ public class FailoverTransportFactory extends TransportFactory {
* @throws IOException * @throws IOException
*/ */
public Transport createTransport(CompositeData compositData) throws IOException { public Transport createTransport(CompositeData compositData) throws IOException {
Map options = compositData.getParameters(); Map<String, String> options = compositData.getParameters();
FailoverTransport transport = createTransport(options); FailoverTransport transport = createTransport(options);
if (!options.isEmpty()) { if (!options.isEmpty()) {
throw new IllegalArgumentException("Invalid connect parameters: " + options); throw new IllegalArgumentException("Invalid connect parameters: " + options);
@ -67,7 +67,7 @@ public class FailoverTransportFactory extends TransportFactory {
return transport; return transport;
} }
public FailoverTransport createTransport(Map parameters) throws IOException { public FailoverTransport createTransport(Map<String, String> parameters) throws IOException {
FailoverTransport transport = new FailoverTransport(); FailoverTransport transport = new FailoverTransport();
IntrospectionSupport.setProperties(transport, parameters); IntrospectionSupport.setProperties(transport, parameters);
return transport; return transport;

View File

@ -36,7 +36,6 @@ public class FailoverConsumerTest extends NetworkTestSupport {
public static final int MSG_COUNT = 100; public static final int MSG_COUNT = 100;
private static final Logger LOG = LoggerFactory.getLogger(FailoverConsumerTest.class); private static final Logger LOG = LoggerFactory.getLogger(FailoverConsumerTest.class);
public void testPublisherFailsOver() throws Exception { public void testPublisherFailsOver() throws Exception {
// Uncomment this if you want to use remote broker created by // Uncomment this if you want to use remote broker created by
// NetworkTestSupport. // NetworkTestSupport.
@ -72,7 +71,7 @@ public class FailoverConsumerTest extends NetworkTestSupport {
// though). // though).
// So we must use external broker ant restart it manually. // So we must use external broker ant restart it manually.
LOG.info("You should restart remote broker now and press enter!"); LOG.info("You should restart remote broker now and press enter!");
System.in.read(); //System.in.read();
// Thread.sleep(20000); // Thread.sleep(20000);
restartRemoteBroker(); restartRemoteBroker();
msg.acknowledge(); msg.acknowledge();
@ -114,6 +113,6 @@ public class FailoverConsumerTest extends NetworkTestSupport {
} }
protected String getRemoteURI() { protected String getRemoteURI() {
return "tcp://localhost:55555"; return "tcp://localhost:61616";
} }
} }

View File

@ -0,0 +1,110 @@
package org.apache.activemq.transport.failover;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import static org.junit.Assert.assertTrue;
public class InitalReconnectDelayTest {
private static final transient Logger LOG = LoggerFactory.getLogger(InitalReconnectDelayTest.class);
protected BrokerService broker1;
protected BrokerService broker2;
protected CountDownLatch broker2Started = new CountDownLatch(1);
protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false&initialReconnectDelay=15000";
@Test
public void testInitialReconnectDelay() throws Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(uriString);
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue("foo");
MessageProducer producer = session.createProducer(destination);
long start = (new Date()).getTime();
producer.send(session.createTextMessage("TEST"));
long end = (new Date()).getTime();
//Verify we can send quickly
assertTrue((end - start) < 2000);
//Halt the broker1...
LOG.info("Stopping the Broker1...");
broker1.stop();
LOG.info("Attempting to send... failover should kick in...");
start = (new Date()).getTime();
producer.send(session.createTextMessage("TEST"));
end = (new Date()).getTime();
//Inital reconnection should kick in and be darned close to what we expected
LOG.info("Failover took " + (end - start) + " ms.");
assertTrue("Failover took " + (end - start) + " ms and should be > 14000.", (end - start) > 14000);
}
@Before
public void setUp() throws Exception {
final String dataDir = "target/data/shared";
broker1 = new BrokerService();
broker1.setBrokerName("broker1");
broker1.setDeleteAllMessagesOnStartup(true);
broker1.setDataDirectory(dataDir);
broker1.addConnector("tcp://localhost:62001");
broker1.setUseJmx(false);
broker1.start();
broker1.waitUntilStarted();
broker2 = new BrokerService();
broker2.setBrokerName("broker2");
broker2.setDataDirectory(dataDir);
broker2.setUseJmx(false);
broker2.addConnector("tcp://localhost:62002");
broker2.start();
broker2.waitUntilStarted();
}
protected String getSlaveXml() {
return "org/apache/activemq/broker/ft/sharedFileSlave.xml";
}
protected String getMasterXml() {
return "org/apache/activemq/broker/ft/sharedFileMaster.xml";
}
@After
public void tearDown() throws Exception {
if (broker1.isStarted()) {
broker1.stop();
broker1.waitUntilStopped();
}
if (broker2.isStarted()) {
broker2.stop();
broker2.waitUntilStopped();
}
}
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory(uriString);
}
}