diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index f1a8baa498..08e5b58787 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -29,6 +29,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicReference; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.Command; @@ -73,7 +74,7 @@ public class FailoverTransport implements CompositeTransport { private URI connectedTransportURI; private URI failedConnectTransportURI; - private Transport connectedTransport; + private final AtomicReference connectedTransport = new AtomicReference(); private final TaskRunner reconnectTask; private final ExecutorService executor; private boolean started; @@ -115,7 +116,7 @@ public class FailoverTransport implements CompositeTransport { public boolean iterate() { boolean result=false; boolean buildBackup=true; - if (connectedTransport==null && !disposed) { + if (connectedTransport.get()==null && !disposed) { result=doReconnect(); buildBackup=false; } @@ -196,6 +197,12 @@ public class FailoverTransport implements CompositeTransport { if (transportListener != null) { transportListener.transportInterupted(); } + + Transport transport = connectedTransport.get(); + if( transport!=null ) { + ServiceSupport.dispose(transport); + } + synchronized (reconnectMutex) { boolean reconnectOk = false; if(started) { @@ -204,11 +211,10 @@ public class FailoverTransport implements CompositeTransport { reconnectOk = true; } - if (connectedTransport != null) { + if (connectedTransport.get() != null) { initialized = false; - ServiceSupport.dispose(connectedTransport); failedConnectTransportURI=connectedTransportURI; - connectedTransport = null; + connectedTransport.set(null); connectedTransportURI = null; connected=false; } @@ -228,8 +234,8 @@ public class FailoverTransport implements CompositeTransport { started = true; stateTracker.setMaxCacheSize(getMaxCacheSize()); stateTracker.setTrackMessages(isTrackMessages()); - if (connectedTransport != null) { - stateTracker.restore(connectedTransport); + if (connectedTransport.get() != null) { + stateTracker.restore(connectedTransport.get()); } else { reconnect(); } @@ -247,9 +253,8 @@ public class FailoverTransport implements CompositeTransport { disposed = true; connected = false; - if (connectedTransport != null) { - transportToStop = connectedTransport; - connectedTransport = null; + if (connectedTransport.get() != null) { + transportToStop = connectedTransport.getAndSet(null); } reconnectMutex.notifyAll(); } @@ -296,7 +301,7 @@ public class FailoverTransport implements CompositeTransport { } public Transport getConnectedTransport() { - return connectedTransport; + return connectedTransport.get(); } public URI getConnectedTransportURI() { @@ -373,7 +378,7 @@ public class FailoverTransport implements CompositeTransport { synchronized (reconnectMutex) { - if (isShutdownCommand(command) && connectedTransport == null) { + if (isShutdownCommand(command) && connectedTransport.get() == null) { if(command.isShutdownInfo()) { // Skipping send of ShutdownInfo command when not connected. return; @@ -391,7 +396,7 @@ public class FailoverTransport implements CompositeTransport { try { // Wait for transport to be connected. - while (connectedTransport == null && !disposed && connectionFailure == null) { + while (connectedTransport.get() == null && !disposed && connectionFailure == null) { LOG.trace("Waiting for transport to reconnect."); try { reconnectMutex.wait(1000); @@ -401,7 +406,7 @@ public class FailoverTransport implements CompositeTransport { } } - if (connectedTransport == null) { + if (connectedTransport.get() == null) { // Previous loop may have exited due to use being // disposed. if (disposed) { @@ -427,7 +432,7 @@ public class FailoverTransport implements CompositeTransport { // Send the message. try { - connectedTransport.oneway(command); + connectedTransport.get().oneway(command); stateTracker.trackBack(command); } catch (IOException e) { @@ -559,10 +564,9 @@ public class FailoverTransport implements CompositeTransport { if (target.isAssignableFrom(getClass())) { return target.cast(this); } - synchronized (reconnectMutex) { - if (connectedTransport != null) { - return connectedTransport.narrow(target); - } + Transport transport = connectedTransport.get(); + if ( transport != null) { + return transport.narrow(target); } return null; @@ -594,8 +598,9 @@ public class FailoverTransport implements CompositeTransport { } public String getRemoteAddress() { - if (connectedTransport != null) { - return connectedTransport.getRemoteAddress(); + Transport transport = connectedTransport.get(); + if ( transport != null) { + return transport.getRemoteAddress(); } return null; } @@ -613,7 +618,7 @@ public class FailoverTransport implements CompositeTransport { reconnectMutex.notifyAll(); } - if (connectedTransport != null || disposed || connectionFailure != null) { + if (connectedTransport.get() != null || disposed || connectionFailure != null) { return false; } else { List connectList = getConnectList(); @@ -635,7 +640,7 @@ public class FailoverTransport implements CompositeTransport { reconnectDelay = initialReconnectDelay; failedConnectTransportURI=null; connectedTransportURI = uri; - connectedTransport = t; + connectedTransport.set(t); reconnectMutex.notifyAll(); connectFailures = 0; LOG.info("Successfully reconnected to backup " + uri); @@ -646,7 +651,7 @@ public class FailoverTransport implements CompositeTransport { } Iterator iter = connectList.iterator(); - while(iter.hasNext() && connectedTransport == null && !disposed) { + while(iter.hasNext() && connectedTransport.get() == null && !disposed) { URI uri = iter.next(); try { LOG.debug("Attempting connect to: " + uri); @@ -661,7 +666,7 @@ public class FailoverTransport implements CompositeTransport { LOG.debug("Connection established"); reconnectDelay = initialReconnectDelay; connectedTransportURI = uri; - connectedTransport = t; + connectedTransport.set(t); reconnectMutex.notifyAll(); connectFailures = 0; if (transportListener != null) {