diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java b/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java index e5e531918b..ab99e20cc2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java @@ -142,6 +142,11 @@ public interface Transport extends Service { */ boolean isDisposed(); + /** + * @return true if the transport is connected + */ + boolean isConnected(); + /** * reconnect to another location * @param uri diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java b/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java index 1b87824e61..b58f71f8a9 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java @@ -129,6 +129,10 @@ public class TransportFilter implements TransportListener, Transport { public boolean isDisposed() { return next.isDisposed(); } + + public boolean isConnected() { + return next.isConnected(); + } public void reconnect(URI uri) throws IOException { next.reconnect(uri); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java b/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java index 0ab5e24401..13876047bf 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java @@ -115,5 +115,9 @@ public abstract class TransportSupport extends ServiceSupport implements Transpo public boolean isDisposed() { return isStopped(); } + + public boolean isConnected() { + return isStarted(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 3ccaa1565c..f0579269e3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -57,6 +57,7 @@ public class FailoverTransport implements CompositeTransport { private TransportListener transportListener; private boolean disposed; + private boolean connected; private final CopyOnWriteArrayList uris = new CopyOnWriteArrayList(); private final Object reconnectMutex = new Object(); @@ -182,6 +183,7 @@ public class FailoverTransport implements CompositeTransport { failedConnectTransportURI=connectedTransportURI; connectedTransport = null; connectedTransportURI = null; + connected=false; } reconnectTask.wakeup(); } @@ -211,6 +213,7 @@ public class FailoverTransport implements CompositeTransport { } started = false; disposed = true; + connected = false; if (connectedTransport != null) { transportToStop = connectedTransport; @@ -593,6 +596,7 @@ public class FailoverTransport implements CompositeTransport { }else { LOG.info("Successfully reconnected to " + uri); } + connected=true; return false; } catch (Exception e) { failure = e; @@ -669,14 +673,17 @@ public class FailoverTransport implements CompositeTransport { return false; } -public boolean isDisposed() { - return disposed; -} - -public void reconnect(URI uri) throws IOException { - add(new URI[] {uri}); -} - - + public boolean isDisposed() { + return disposed; + } + + + public boolean isConnected() { + return connected; + } + + public void reconnect(URI uri) throws IOException { + add(new URI[] {uri}); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java index b331ef8258..abd71c072a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java @@ -56,6 +56,7 @@ public class FanoutTransport implements CompositeTransport { private TransportListener transportListener; private boolean disposed; + private boolean connected; private final Object reconnectMutex = new Object(); private final ConnectionStateTracker stateTracker = new ConnectionStateTracker(); @@ -281,6 +282,7 @@ public class FanoutTransport implements CompositeTransport { restoreTransport(th); } } + connected=true; } } @@ -293,6 +295,7 @@ public class FanoutTransport implements CompositeTransport { } started = false; disposed = true; + connected=false; for (Iterator iter = transports.iterator(); iter.hasNext();) { FanoutTransportHandler th = iter.next(); @@ -578,4 +581,9 @@ public class FanoutTransport implements CompositeTransport { public boolean isDisposed() { return disposed; } + + + public boolean isConnected() { + return connected; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java index f4ca2b0544..d0f6a41e63 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java @@ -143,9 +143,12 @@ public class MockTransport extends DefaultTransportListener implements Transport public boolean isDisposed() { return getNext().isDisposed(); } + + public boolean isConnected() { + return getNext().isConnected(); + } public void reconnect(URI uri) throws IOException { getNext().reconnect(uri); } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java index a48fc25e59..2304910404 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java @@ -17,13 +17,11 @@ package org.apache.activemq.transport.vm; import java.io.IOException; -import java.io.InterruptedIOException; import java.net.URI; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import org.apache.activemq.command.Command; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunnerFactory; @@ -331,6 +329,10 @@ public class VMTransport implements Transport, Task { public boolean isDisposed() { return disposed; } + + public boolean isConnected() { + return started; + } public void reconnect(URI uri) throws IOException { throw new IOException("Not supported"); diff --git a/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java b/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java index c7842afc15..d7969ac351 100644 --- a/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java @@ -45,7 +45,13 @@ public abstract class ServiceSupport implements Service { public void start() throws Exception { if (started.compareAndSet(false, true)) { - doStart(); + boolean success = false; + try { + doStart(); + success = true; + } finally { + started.set(success); + } } }