git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@616733 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-01-30 12:10:22 +00:00
parent e210657db2
commit 418823a7e1
2 changed files with 161 additions and 123 deletions

View File

@ -132,7 +132,7 @@ public interface Transport extends Service {
/**
* Indicates if the transport can handle faults
*
* @return tru if fault tolerant
* @return true if fault tolerant
*/
boolean isFaultTolerant();

View File

@ -83,8 +83,8 @@ public class FailoverTransport implements CompositeTransport {
private boolean firstConnection = true;
//optionally always have a backup created
private boolean backup=false;
private URI backupTransportURI;
private Transport backupTransport;
private List<BackupTransport> backups=new CopyOnWriteArrayList<BackupTransport>();
private int backupPoolSize=2;
private final TransportListener myTransportListener = createTransportListener();
@ -95,127 +95,12 @@ public class FailoverTransport implements CompositeTransport {
// Setup a task that is used to reconnect the a connection async.
reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
public boolean iterate() {
Exception failure = null;
synchronized (reconnectMutex) {
if (disposed || connectionFailure != null) {
reconnectMutex.notifyAll();
}
if (connectedTransport != null || disposed || connectionFailure != null) {
return false;
} else {
List<URI> connectList = getConnectList();
if (connectList.isEmpty()) {
failure = new IOException("No uris available to connect to.");
} else {
if (!useExponentialBackOff) {
reconnectDelay = initialReconnectDelay;
}
if (backup && backupTransport != null) {
Transport t = backupTransport;
URI uri = backupTransportURI;
backupTransport=null;
backupTransportURI=null;
t.setTransportListener(myTransportListener);
try {
if (started) {
restoreTransport(t);
}
reconnectDelay = initialReconnectDelay;
connectedTransportURI = uri;
connectedTransport = t;
reconnectMutex.notifyAll();
connectFailures = 0;
LOG.info("Successfully reconnected to backup " + uri);
return false;
}catch (Exception e) {
LOG.debug("Backup transport failed",e);
}
}
Iterator<URI> iter = connectList.iterator();
while(iter.hasNext() && connectedTransport == null && !disposed) {
URI uri = iter.next();
try {
LOG.debug("Attempting connect to: " + uri);
Transport t = TransportFactory.compositeConnect(uri);
t.setTransportListener(myTransportListener);
t.start();
if (started) {
restoreTransport(t);
}
LOG.debug("Connection established");
reconnectDelay = initialReconnectDelay;
connectedTransportURI = uri;
connectedTransport = t;
reconnectMutex.notifyAll();
connectFailures = 0;
if (transportListener != null) {
transportListener.transportResumed();
}
if (firstConnection) {
firstConnection=false;
LOG.info("Successfully connected to " + uri);
if(backup) {
while(iter.hasNext() && backupTransport==null){
uri = iter.next();
try {
t = TransportFactory.compositeConnect(uri);
t.setTransportListener(new DefaultTransportListener());
t.start();
backupTransport=t;
backupTransportURI=uri;
}catch(Exception e) {
LOG.debug("Failed to create backup to " + uri,e);
}
}
}
}else {
LOG.info("Successfully reconnected to " + uri);
}
return false;
} catch (Exception e) {
failure = e;
LOG.debug("Connect fail to: " + uri + ", reason: " + e);
}
}
}
}
if (maxReconnectAttempts > 0 && ++connectFailures >= maxReconnectAttempts) {
LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
connectionFailure = failure;
reconnectMutex.notifyAll();
return false;
}
}
if (!disposed) {
LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
synchronized (sleepMutex) {
try {
sleepMutex.wait(reconnectDelay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (useExponentialBackOff) {
// Exponential increment of reconnect delay.
reconnectDelay *= backOffMultiplier;
if (reconnectDelay > maxReconnectDelay) {
reconnectDelay = maxReconnectDelay;
}
}
}
return !disposed;
boolean result = doReconnect();
if(!result) {
buildBackups();
}
return result;
}
}, "ActiveMQ Failover Worker: " + System.identityHashCode(this));
@ -394,6 +279,22 @@ public class FailoverTransport implements CompositeTransport {
public void setRandomize(boolean randomize) {
this.randomize = randomize;
}
public boolean isBackup() {
return backup;
}
public void setBackup(boolean backup) {
this.backup = backup;
}
public int getBackupPoolSize() {
return backupPoolSize;
}
public void setBackupPoolSize(int backupPoolSize) {
this.backupPoolSize = backupPoolSize;
}
public void oneway(Object o) throws IOException {
Command command = (Command)o;
@ -613,5 +514,142 @@ public class FailoverTransport implements CompositeTransport {
public boolean isFaultTolerant() {
return true;
}
final boolean doReconnect() {
Exception failure = null;
synchronized (reconnectMutex) {
if (disposed || connectionFailure != null) {
reconnectMutex.notifyAll();
}
if (connectedTransport != null || disposed || connectionFailure != null) {
return false;
} else {
List<URI> connectList = getConnectList();
if (connectList.isEmpty()) {
failure = new IOException("No uris available to connect to.");
} else {
if (!useExponentialBackOff) {
reconnectDelay = initialReconnectDelay;
}
if (backup && !backups.isEmpty()) {
BackupTransport bt = backups.remove(0);
Transport t = bt.getTransport();
URI uri = bt.getUri();
t.setTransportListener(myTransportListener);
try {
if (started) {
restoreTransport(t);
}
reconnectDelay = initialReconnectDelay;
connectedTransportURI = uri;
connectedTransport = t;
reconnectMutex.notifyAll();
connectFailures = 0;
LOG.info("Successfully reconnected to backup " + uri);
return false;
}catch (Exception e) {
LOG.debug("Backup transport failed",e);
}
}
Iterator<URI> iter = connectList.iterator();
while(iter.hasNext() && connectedTransport == null && !disposed) {
URI uri = iter.next();
try {
LOG.debug("Attempting connect to: " + uri);
Transport t = TransportFactory.compositeConnect(uri);
t.setTransportListener(myTransportListener);
t.start();
if (started) {
restoreTransport(t);
}
LOG.debug("Connection established");
reconnectDelay = initialReconnectDelay;
connectedTransportURI = uri;
connectedTransport = t;
reconnectMutex.notifyAll();
connectFailures = 0;
if (transportListener != null) {
transportListener.transportResumed();
}
if (firstConnection) {
firstConnection=false;
LOG.info("Successfully connected to " + uri);
}else {
LOG.info("Successfully reconnected to " + uri);
}
return false;
} catch (Exception e) {
failure = e;
LOG.debug("Connect fail to: " + uri + ", reason: " + e);
}
}
}
}
if (maxReconnectAttempts > 0 && ++connectFailures >= maxReconnectAttempts) {
LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
connectionFailure = failure;
reconnectMutex.notifyAll();
return false;
}
}
if (!disposed) {
LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
synchronized (sleepMutex) {
try {
sleepMutex.wait(reconnectDelay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (useExponentialBackOff) {
// Exponential increment of reconnect delay.
reconnectDelay *= backOffMultiplier;
if (reconnectDelay > maxReconnectDelay) {
reconnectDelay = maxReconnectDelay;
}
}
}
return !disposed;
}
final boolean buildBackups() {
synchronized (reconnectMutex) {
if (backup && backups.size() < backupPoolSize) {
List<URI> connectList = getConnectList();
for (Iterator<URI>iter = connectList.iterator();iter.hasNext() && backups.size() < backupPoolSize;) {
URI uri = iter.next();
if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
try {
BackupTransport bt = new BackupTransport();
bt.setUri(uri);
if (!backups.contains(bt)) {
Transport t = TransportFactory.compositeConnect(uri);
t.setTransportListener(new DefaultTransportListener());
t.start();
bt.setTransport(t);
backups.add(bt);
}
}catch(Exception e) {
LOG.debug("Failed to build backup ",e);
}
}
}
}
}
return false;
}
}