git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@616506 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-01-29 20:06:47 +00:00
parent 3249682bda
commit e210657db2
1 changed files with 65 additions and 9 deletions

View File

@ -21,6 +21,7 @@ import java.io.InterruptedIOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@ -34,6 +35,7 @@ import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.transport.CompositeTransport;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
@ -78,6 +80,12 @@ public class FailoverTransport implements CompositeTransport {
private int connectFailures;
private long reconnectDelay = initialReconnectDelay;
private Exception connectionFailure;
private boolean firstConnection = true;
//optionally always have a backup created
private boolean backup=false;
private URI backupTransportURI;
private Transport backupTransport;
private final TransportListener myTransportListener = createTransportListener();
@ -100,22 +108,44 @@ public class FailoverTransport implements CompositeTransport {
if (connectedTransport != null || disposed || connectionFailure != null) {
return false;
} else {
ArrayList<Object> connectList = getConnectList();
List<URI> connectList = getConnectList();
if (connectList.isEmpty()) {
failure = new IOException("No uris available to connect to.");
} else {
if (!useExponentialBackOff) {
reconnectDelay = initialReconnectDelay;
}
Iterator<Object> iter = connectList.iterator();
for (int i = 0; iter.hasNext() && connectedTransport == null && !disposed; i++) {
URI uri = (URI)iter.next();
if (backup && backupTransport != null) {
Transport t = backupTransport;
URI uri = backupTransportURI;
backupTransport=null;
backupTransportURI=null;
t.setTransportListener(myTransportListener);
try {
if (started) {
restoreTransport(t);
}
reconnectDelay = initialReconnectDelay;
connectedTransportURI = uri;
connectedTransport = t;
reconnectMutex.notifyAll();
connectFailures = 0;
LOG.info("Successfully reconnected to backup " + uri);
return false;
}catch (Exception e) {
LOG.debug("Backup transport failed",e);
}
}
Iterator<URI> iter = connectList.iterator();
while(iter.hasNext() && connectedTransport == null && !disposed) {
URI uri = iter.next();
try {
LOG.debug("Attempting connect to: " + uri);
Transport t = TransportFactory.compositeConnect(uri);
t.setTransportListener(myTransportListener);
t.start();
if (started) {
restoreTransport(t);
}
@ -129,7 +159,26 @@ public class FailoverTransport implements CompositeTransport {
if (transportListener != null) {
transportListener.transportResumed();
}
LOG.info("Successfully reconnected to " + uri);
if (firstConnection) {
firstConnection=false;
LOG.info("Successfully connected to " + uri);
if(backup) {
while(iter.hasNext() && backupTransport==null){
uri = iter.next();
try {
t = TransportFactory.compositeConnect(uri);
t.setTransportListener(new DefaultTransportListener());
t.start();
backupTransport=t;
backupTransportURI=uri;
}catch(Exception e) {
LOG.debug("Failed to create backup to " + uri,e);
}
}
}
}else {
LOG.info("Successfully reconnected to " + uri);
}
return false;
} catch (Exception e) {
failure = e;
@ -488,19 +537,26 @@ public class FailoverTransport implements CompositeTransport {
}
}
private ArrayList<Object> getConnectList() {
ArrayList<Object> l = new ArrayList<Object>(uris);
private List<URI> getConnectList() {
ArrayList<URI> l = new ArrayList<URI>(uris);
boolean removed = false;
if (connectedTransportURI != null) {
removed = l.remove(connectedTransportURI);
}
if (randomize) {
// Randomly, reorder the list by random swapping
Random r = new Random();
r.setSeed(System.currentTimeMillis());
for (int i = 0; i < l.size(); i++) {
int p = r.nextInt(l.size());
Object t = l.get(p);
URI t = l.get(p);
l.set(p, l.get(i));
l.set(i, t);
}
}
if (removed) {
l.add(connectedTransportURI);
}
return l;
}