diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 7dca751c6b..512c0cb85f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -21,6 +21,7 @@ import java.io.InterruptedIOException; import java.net.URI; import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -34,6 +35,7 @@ import org.apache.activemq.thread.DefaultThreadPools; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.transport.CompositeTransport; +import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.ResponseCallback; import org.apache.activemq.transport.Transport; @@ -78,6 +80,12 @@ public class FailoverTransport implements CompositeTransport { private int connectFailures; private long reconnectDelay = initialReconnectDelay; private Exception connectionFailure; + private boolean firstConnection = true; + //optionally always have a backup created + private boolean backup=false; + private URI backupTransportURI; + private Transport backupTransport; + private final TransportListener myTransportListener = createTransportListener(); @@ -100,22 +108,44 @@ public class FailoverTransport implements CompositeTransport { if (connectedTransport != null || disposed || connectionFailure != null) { return false; } else { - ArrayList connectList = getConnectList(); + List connectList = getConnectList(); if (connectList.isEmpty()) { failure = new IOException("No uris available to connect to."); } else { if (!useExponentialBackOff) { reconnectDelay = initialReconnectDelay; } - Iterator iter = connectList.iterator(); - for (int i = 0; iter.hasNext() && connectedTransport == null && !disposed; i++) { - URI uri = (URI)iter.next(); + if (backup && backupTransport != null) { + Transport t = backupTransport; + URI uri = backupTransportURI; + backupTransport=null; + backupTransportURI=null; + t.setTransportListener(myTransportListener); + try { + if (started) { + restoreTransport(t); + } + reconnectDelay = initialReconnectDelay; + connectedTransportURI = uri; + connectedTransport = t; + reconnectMutex.notifyAll(); + connectFailures = 0; + LOG.info("Successfully reconnected to backup " + uri); + return false; + }catch (Exception e) { + LOG.debug("Backup transport failed",e); + } + } + + Iterator iter = connectList.iterator(); + while(iter.hasNext() && connectedTransport == null && !disposed) { + URI uri = iter.next(); try { LOG.debug("Attempting connect to: " + uri); Transport t = TransportFactory.compositeConnect(uri); t.setTransportListener(myTransportListener); t.start(); - + if (started) { restoreTransport(t); } @@ -129,7 +159,26 @@ public class FailoverTransport implements CompositeTransport { if (transportListener != null) { transportListener.transportResumed(); } - LOG.info("Successfully reconnected to " + uri); + if (firstConnection) { + firstConnection=false; + LOG.info("Successfully connected to " + uri); + if(backup) { + while(iter.hasNext() && backupTransport==null){ + uri = iter.next(); + try { + t = TransportFactory.compositeConnect(uri); + t.setTransportListener(new DefaultTransportListener()); + t.start(); + backupTransport=t; + backupTransportURI=uri; + }catch(Exception e) { + LOG.debug("Failed to create backup to " + uri,e); + } + } + } + }else { + LOG.info("Successfully reconnected to " + uri); + } return false; } catch (Exception e) { failure = e; @@ -488,19 +537,26 @@ public class FailoverTransport implements CompositeTransport { } } - private ArrayList getConnectList() { - ArrayList l = new ArrayList(uris); + private List getConnectList() { + ArrayList l = new ArrayList(uris); + boolean removed = false; + if (connectedTransportURI != null) { + removed = l.remove(connectedTransportURI); + } if (randomize) { // Randomly, reorder the list by random swapping Random r = new Random(); r.setSeed(System.currentTimeMillis()); for (int i = 0; i < l.size(); i++) { int p = r.nextInt(l.size()); - Object t = l.get(p); + URI t = l.get(p); l.set(p, l.get(i)); l.set(i, t); } } + if (removed) { + l.add(connectedTransportURI); + } return l; }