add isConnected() flag to a Transport

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@618656 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-02-05 14:06:43 +00:00
parent c128634fa6
commit ecc87ea672
8 changed files with 52 additions and 13 deletions

View File

@ -142,6 +142,11 @@ public interface Transport extends Service {
*/ */
boolean isDisposed(); boolean isDisposed();
/**
* @return true if the transport is connected
*/
boolean isConnected();
/** /**
* reconnect to another location * reconnect to another location
* @param uri * @param uri

View File

@ -130,6 +130,10 @@ public class TransportFilter implements TransportListener, Transport {
return next.isDisposed(); return next.isDisposed();
} }
public boolean isConnected() {
return next.isConnected();
}
public void reconnect(URI uri) throws IOException { public void reconnect(URI uri) throws IOException {
next.reconnect(uri); next.reconnect(uri);
} }

View File

@ -116,4 +116,8 @@ public abstract class TransportSupport extends ServiceSupport implements Transpo
return isStopped(); return isStopped();
} }
public boolean isConnected() {
return isStarted();
}
} }

View File

@ -57,6 +57,7 @@ public class FailoverTransport implements CompositeTransport {
private TransportListener transportListener; private TransportListener transportListener;
private boolean disposed; private boolean disposed;
private boolean connected;
private final CopyOnWriteArrayList<URI> uris = new CopyOnWriteArrayList<URI>(); private final CopyOnWriteArrayList<URI> uris = new CopyOnWriteArrayList<URI>();
private final Object reconnectMutex = new Object(); private final Object reconnectMutex = new Object();
@ -182,6 +183,7 @@ public class FailoverTransport implements CompositeTransport {
failedConnectTransportURI=connectedTransportURI; failedConnectTransportURI=connectedTransportURI;
connectedTransport = null; connectedTransport = null;
connectedTransportURI = null; connectedTransportURI = null;
connected=false;
} }
reconnectTask.wakeup(); reconnectTask.wakeup();
} }
@ -211,6 +213,7 @@ public class FailoverTransport implements CompositeTransport {
} }
started = false; started = false;
disposed = true; disposed = true;
connected = false;
if (connectedTransport != null) { if (connectedTransport != null) {
transportToStop = connectedTransport; transportToStop = connectedTransport;
@ -593,6 +596,7 @@ public class FailoverTransport implements CompositeTransport {
}else { }else {
LOG.info("Successfully reconnected to " + uri); LOG.info("Successfully reconnected to " + uri);
} }
connected=true;
return false; return false;
} catch (Exception e) { } catch (Exception e) {
failure = e; failure = e;
@ -669,14 +673,17 @@ public class FailoverTransport implements CompositeTransport {
return false; return false;
} }
public boolean isDisposed() { public boolean isDisposed() {
return disposed; return disposed;
} }
public void reconnect(URI uri) throws IOException {
add(new URI[] {uri});
}
public boolean isConnected() {
return connected;
}
public void reconnect(URI uri) throws IOException {
add(new URI[] {uri});
}
} }

View File

@ -56,6 +56,7 @@ public class FanoutTransport implements CompositeTransport {
private TransportListener transportListener; private TransportListener transportListener;
private boolean disposed; private boolean disposed;
private boolean connected;
private final Object reconnectMutex = new Object(); private final Object reconnectMutex = new Object();
private final ConnectionStateTracker stateTracker = new ConnectionStateTracker(); private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
@ -281,6 +282,7 @@ public class FanoutTransport implements CompositeTransport {
restoreTransport(th); restoreTransport(th);
} }
} }
connected=true;
} }
} }
@ -293,6 +295,7 @@ public class FanoutTransport implements CompositeTransport {
} }
started = false; started = false;
disposed = true; disposed = true;
connected=false;
for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
FanoutTransportHandler th = iter.next(); FanoutTransportHandler th = iter.next();
@ -578,4 +581,9 @@ public class FanoutTransport implements CompositeTransport {
public boolean isDisposed() { public boolean isDisposed() {
return disposed; return disposed;
} }
public boolean isConnected() {
return connected;
}
} }

View File

@ -144,8 +144,11 @@ public class MockTransport extends DefaultTransportListener implements Transport
return getNext().isDisposed(); return getNext().isDisposed();
} }
public boolean isConnected() {
return getNext().isConnected();
}
public void reconnect(URI uri) throws IOException { public void reconnect(URI uri) throws IOException {
getNext().reconnect(uri); getNext().reconnect(uri);
} }
} }

View File

@ -17,13 +17,11 @@
package org.apache.activemq.transport.vm; package org.apache.activemq.transport.vm;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.URI; import java.net.URI;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.command.Command;
import org.apache.activemq.thread.Task; import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.TaskRunnerFactory;
@ -332,6 +330,10 @@ public class VMTransport implements Transport, Task {
return disposed; return disposed;
} }
public boolean isConnected() {
return started;
}
public void reconnect(URI uri) throws IOException { public void reconnect(URI uri) throws IOException {
throw new IOException("Not supported"); throw new IOException("Not supported");
} }

View File

@ -45,7 +45,13 @@ public abstract class ServiceSupport implements Service {
public void start() throws Exception { public void start() throws Exception {
if (started.compareAndSet(false, true)) { if (started.compareAndSet(false, true)) {
doStart(); boolean success = false;
try {
doStart();
success = true;
} finally {
started.set(success);
}
} }
} }