added isDisposed to Transport interface

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@617015 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-01-31 06:34:12 +00:00
parent 5f83e62d66
commit 3ac0537e3e
8 changed files with 118 additions and 10 deletions

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport; package org.apache.activemq.transport;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import org.apache.activemq.Service; import org.apache.activemq.Service;
@ -135,5 +136,17 @@ public interface Transport extends Service {
* @return true if fault tolerant * @return true if fault tolerant
*/ */
boolean isFaultTolerant(); boolean isFaultTolerant();
/**
* @return true if the transport is disposed
*/
boolean isDisposed();
/**
* reconnect to another location
* @param uri
* @throws IOException on failure of if not supported
*/
void reconnect(URI uri) throws IOException;
} }

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport; package org.apache.activemq.transport;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
/** /**
* @version $Revision: 1.5 $ * @version $Revision: 1.5 $
@ -124,4 +125,12 @@ public class TransportFilter implements TransportListener, Transport {
public boolean isFaultTolerant() { public boolean isFaultTolerant() {
return next.isFaultTolerant(); return next.isFaultTolerant();
} }
public boolean isDisposed() {
return next.isDisposed();
}
public void reconnect(URI uri) throws IOException {
next.reconnect(uri);
}
} }

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport; package org.apache.activemq.transport;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import org.apache.activemq.util.ServiceSupport; import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -105,5 +106,14 @@ public abstract class TransportSupport extends ServiceSupport implements Transpo
public boolean isFaultTolerant() { public boolean isFaultTolerant() {
return false; return false;
} }
public void reconnect(URI uri) throws IOException {
throw new IOException("Not supported");
}
public boolean isDisposed() {
return isStopped();
}
} }

View File

@ -18,13 +18,28 @@
package org.apache.activemq.transport.failover; package org.apache.activemq.transport.failover;
import java.io.IOException;
import java.net.URI; import java.net.URI;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
public class BackupTransport { class BackupTransport extends DefaultTransportListener{
private FailoverTransport failoverTransport;
private Transport transport; private Transport transport;
private URI uri; private URI uri;
private boolean disposed;
BackupTransport(FailoverTransport ft){
this.failoverTransport=ft;
}
public void onException(IOException error) {
this.disposed=true;
if (failoverTransport!=null) {
this.failoverTransport.reconnect();
}
}
public Transport getTransport() { public Transport getTransport() {
return transport; return transport;
} }
@ -38,6 +53,14 @@ public class BackupTransport {
this.uri = uri; this.uri = uri;
} }
public boolean isDisposed() {
return disposed || transport != null && transport.isDisposed();
}
public void setDisposed(boolean disposed) {
this.disposed = disposed;
}
public int hashCode() { public int hashCode() {
return uri != null ? uri.hashCode():-1; return uri != null ? uri.hashCode():-1;
} }

View File

@ -35,7 +35,6 @@ import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task; import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.transport.CompositeTransport; import org.apache.activemq.transport.CompositeTransport;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback; import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
@ -66,6 +65,7 @@ public class FailoverTransport implements CompositeTransport {
private final ConcurrentHashMap<Integer, Command> requestMap = new ConcurrentHashMap<Integer, Command>(); private final ConcurrentHashMap<Integer, Command> requestMap = new ConcurrentHashMap<Integer, Command>();
private URI connectedTransportURI; private URI connectedTransportURI;
private URI failedConnectTransportURI;
private Transport connectedTransport; private Transport connectedTransport;
private final TaskRunner reconnectTask; private final TaskRunner reconnectTask;
private boolean started; private boolean started;
@ -96,9 +96,17 @@ public class FailoverTransport implements CompositeTransport {
// Setup a task that is used to reconnect the a connection async. // Setup a task that is used to reconnect the a connection async.
reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() { reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
public boolean iterate() { public boolean iterate() {
boolean result = doReconnect(); boolean result=false;
if(!result) { boolean buildBackup=true;
if (connectedTransport==null && !disposed) {
result=doReconnect();
buildBackup=false;
}
if(buildBackup) {
buildBackups(); buildBackups();
}else {
//build backups on the next iteration
result=true;
} }
return result; return result;
} }
@ -171,6 +179,7 @@ public class FailoverTransport implements CompositeTransport {
if (connectedTransport != null) { if (connectedTransport != null) {
initialized = false; initialized = false;
ServiceSupport.dispose(connectedTransport); ServiceSupport.dispose(connectedTransport);
failedConnectTransportURI=connectedTransportURI;
connectedTransport = null; connectedTransport = null;
connectedTransportURI = null; connectedTransportURI = null;
} }
@ -441,8 +450,8 @@ public class FailoverTransport implements CompositeTransport {
private List<URI> getConnectList() { private List<URI> getConnectList() {
ArrayList<URI> l = new ArrayList<URI>(uris); ArrayList<URI> l = new ArrayList<URI>(uris);
boolean removed = false; boolean removed = false;
if (connectedTransportURI != null) { if (failedConnectTransportURI != null) {
removed = l.remove(connectedTransportURI); removed = l.remove(failedConnectTransportURI);
} }
if (randomize) { if (randomize) {
// Randomly, reorder the list by random swapping // Randomly, reorder the list by random swapping
@ -456,7 +465,7 @@ public class FailoverTransport implements CompositeTransport {
} }
} }
if (removed) { if (removed) {
l.add(connectedTransportURI); l.add(failedConnectTransportURI);
} }
return l; return l;
} }
@ -544,6 +553,7 @@ public class FailoverTransport implements CompositeTransport {
restoreTransport(t); restoreTransport(t);
} }
reconnectDelay = initialReconnectDelay; reconnectDelay = initialReconnectDelay;
failedConnectTransportURI=null;
connectedTransportURI = uri; connectedTransportURI = uri;
connectedTransport = t; connectedTransport = t;
reconnectMutex.notifyAll(); reconnectMutex.notifyAll();
@ -625,17 +635,26 @@ public class FailoverTransport implements CompositeTransport {
final boolean buildBackups() { final boolean buildBackups() {
synchronized (reconnectMutex) { synchronized (reconnectMutex) {
if (backup && backups.size() < backupPoolSize) { if (!disposed && backup && backups.size() < backupPoolSize) {
List<URI> connectList = getConnectList(); List<URI> connectList = getConnectList();
//removed disposed backups
List<BackupTransport>disposedList = new ArrayList<BackupTransport>();
for (BackupTransport bt:backups) {
if (bt.isDisposed()) {
disposedList.add(bt);
}
}
backups.removeAll(disposedList);
disposedList.clear();
for (Iterator<URI>iter = connectList.iterator();iter.hasNext() && backups.size() < backupPoolSize;) { for (Iterator<URI>iter = connectList.iterator();iter.hasNext() && backups.size() < backupPoolSize;) {
URI uri = iter.next(); URI uri = iter.next();
if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) { if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
try { try {
BackupTransport bt = new BackupTransport(); BackupTransport bt = new BackupTransport(this);
bt.setUri(uri); bt.setUri(uri);
if (!backups.contains(bt)) { if (!backups.contains(bt)) {
Transport t = TransportFactory.compositeConnect(uri); Transport t = TransportFactory.compositeConnect(uri);
t.setTransportListener(new DefaultTransportListener()); t.setTransportListener(bt);
t.start(); t.start();
bt.setTransport(t); bt.setTransport(t);
backups.add(bt); backups.add(bt);
@ -650,6 +669,14 @@ public class FailoverTransport implements CompositeTransport {
return false; return false;
} }
public boolean isDisposed() {
return disposed;
}
public void reconnect(URI uri) throws IOException {
add(new URI[] {uri});
}
} }

View File

@ -541,6 +541,12 @@ public class FanoutTransport implements CompositeTransport {
} }
} }
public void reconnect(URI uri) throws IOException {
add(new URI[]{uri});
}
public String getRemoteAddress() { public String getRemoteAddress() {
if (primary != null) { if (primary != null) {
@ -569,4 +575,7 @@ public class FanoutTransport implements CompositeTransport {
this.fanOutQueues = fanOutQueues; this.fanOutQueues = fanOutQueues;
} }
public boolean isDisposed() {
return disposed;
}
} }

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport.mock; package org.apache.activemq.transport.mock;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.FutureResponse;
@ -139,4 +140,12 @@ public class MockTransport extends DefaultTransportListener implements Transport
return getNext().isFaultTolerant(); return getNext().isFaultTolerant();
} }
public boolean isDisposed() {
return getNext().isDisposed();
}
public void reconnect(URI uri) throws IOException {
getNext().reconnect(uri);
}
} }

View File

@ -327,4 +327,12 @@ public class VMTransport implements Transport, Task {
public boolean isFaultTolerant() { public boolean isFaultTolerant() {
return false; return false;
} }
public boolean isDisposed() {
return disposed;
}
public void reconnect(URI uri) throws IOException {
throw new IOException("Not supported");
}
} }