Avoid deadlocking with the thread calling oneway() when we are cleaning up the connection due to an Inactivity error.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@634411 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2008-03-06 20:45:02 +00:00
parent fb0e159499
commit c0b74f0562
1 changed files with 30 additions and 25 deletions

View File

@ -29,6 +29,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
@ -73,7 +74,7 @@ public class FailoverTransport implements CompositeTransport {
private URI connectedTransportURI; private URI connectedTransportURI;
private URI failedConnectTransportURI; private URI failedConnectTransportURI;
private Transport connectedTransport; private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>();
private final TaskRunner reconnectTask; private final TaskRunner reconnectTask;
private final ExecutorService executor; private final ExecutorService executor;
private boolean started; private boolean started;
@ -115,7 +116,7 @@ public class FailoverTransport implements CompositeTransport {
public boolean iterate() { public boolean iterate() {
boolean result=false; boolean result=false;
boolean buildBackup=true; boolean buildBackup=true;
if (connectedTransport==null && !disposed) { if (connectedTransport.get()==null && !disposed) {
result=doReconnect(); result=doReconnect();
buildBackup=false; buildBackup=false;
} }
@ -196,6 +197,12 @@ public class FailoverTransport implements CompositeTransport {
if (transportListener != null) { if (transportListener != null) {
transportListener.transportInterupted(); transportListener.transportInterupted();
} }
Transport transport = connectedTransport.get();
if( transport!=null ) {
ServiceSupport.dispose(transport);
}
synchronized (reconnectMutex) { synchronized (reconnectMutex) {
boolean reconnectOk = false; boolean reconnectOk = false;
if(started) { if(started) {
@ -204,11 +211,10 @@ public class FailoverTransport implements CompositeTransport {
reconnectOk = true; reconnectOk = true;
} }
if (connectedTransport != null) { if (connectedTransport.get() != null) {
initialized = false; initialized = false;
ServiceSupport.dispose(connectedTransport);
failedConnectTransportURI=connectedTransportURI; failedConnectTransportURI=connectedTransportURI;
connectedTransport = null; connectedTransport.set(null);
connectedTransportURI = null; connectedTransportURI = null;
connected=false; connected=false;
} }
@ -228,8 +234,8 @@ public class FailoverTransport implements CompositeTransport {
started = true; started = true;
stateTracker.setMaxCacheSize(getMaxCacheSize()); stateTracker.setMaxCacheSize(getMaxCacheSize());
stateTracker.setTrackMessages(isTrackMessages()); stateTracker.setTrackMessages(isTrackMessages());
if (connectedTransport != null) { if (connectedTransport.get() != null) {
stateTracker.restore(connectedTransport); stateTracker.restore(connectedTransport.get());
} else { } else {
reconnect(); reconnect();
} }
@ -247,9 +253,8 @@ public class FailoverTransport implements CompositeTransport {
disposed = true; disposed = true;
connected = false; connected = false;
if (connectedTransport != null) { if (connectedTransport.get() != null) {
transportToStop = connectedTransport; transportToStop = connectedTransport.getAndSet(null);
connectedTransport = null;
} }
reconnectMutex.notifyAll(); reconnectMutex.notifyAll();
} }
@ -296,7 +301,7 @@ public class FailoverTransport implements CompositeTransport {
} }
public Transport getConnectedTransport() { public Transport getConnectedTransport() {
return connectedTransport; return connectedTransport.get();
} }
public URI getConnectedTransportURI() { public URI getConnectedTransportURI() {
@ -373,7 +378,7 @@ public class FailoverTransport implements CompositeTransport {
synchronized (reconnectMutex) { synchronized (reconnectMutex) {
if (isShutdownCommand(command) && connectedTransport == null) { if (isShutdownCommand(command) && connectedTransport.get() == null) {
if(command.isShutdownInfo()) { if(command.isShutdownInfo()) {
// Skipping send of ShutdownInfo command when not connected. // Skipping send of ShutdownInfo command when not connected.
return; return;
@ -391,7 +396,7 @@ public class FailoverTransport implements CompositeTransport {
try { try {
// Wait for transport to be connected. // Wait for transport to be connected.
while (connectedTransport == null && !disposed && connectionFailure == null) { while (connectedTransport.get() == null && !disposed && connectionFailure == null) {
LOG.trace("Waiting for transport to reconnect."); LOG.trace("Waiting for transport to reconnect.");
try { try {
reconnectMutex.wait(1000); reconnectMutex.wait(1000);
@ -401,7 +406,7 @@ public class FailoverTransport implements CompositeTransport {
} }
} }
if (connectedTransport == null) { if (connectedTransport.get() == null) {
// Previous loop may have exited due to use being // Previous loop may have exited due to use being
// disposed. // disposed.
if (disposed) { if (disposed) {
@ -427,7 +432,7 @@ public class FailoverTransport implements CompositeTransport {
// Send the message. // Send the message.
try { try {
connectedTransport.oneway(command); connectedTransport.get().oneway(command);
stateTracker.trackBack(command); stateTracker.trackBack(command);
} catch (IOException e) { } catch (IOException e) {
@ -559,10 +564,9 @@ public class FailoverTransport implements CompositeTransport {
if (target.isAssignableFrom(getClass())) { if (target.isAssignableFrom(getClass())) {
return target.cast(this); return target.cast(this);
} }
synchronized (reconnectMutex) { Transport transport = connectedTransport.get();
if (connectedTransport != null) { if ( transport != null) {
return connectedTransport.narrow(target); return transport.narrow(target);
}
} }
return null; return null;
@ -594,8 +598,9 @@ public class FailoverTransport implements CompositeTransport {
} }
public String getRemoteAddress() { public String getRemoteAddress() {
if (connectedTransport != null) { Transport transport = connectedTransport.get();
return connectedTransport.getRemoteAddress(); if ( transport != null) {
return transport.getRemoteAddress();
} }
return null; return null;
} }
@ -613,7 +618,7 @@ public class FailoverTransport implements CompositeTransport {
reconnectMutex.notifyAll(); reconnectMutex.notifyAll();
} }
if (connectedTransport != null || disposed || connectionFailure != null) { if (connectedTransport.get() != null || disposed || connectionFailure != null) {
return false; return false;
} else { } else {
List<URI> connectList = getConnectList(); List<URI> connectList = getConnectList();
@ -635,7 +640,7 @@ public class FailoverTransport implements CompositeTransport {
reconnectDelay = initialReconnectDelay; reconnectDelay = initialReconnectDelay;
failedConnectTransportURI=null; failedConnectTransportURI=null;
connectedTransportURI = uri; connectedTransportURI = uri;
connectedTransport = t; connectedTransport.set(t);
reconnectMutex.notifyAll(); reconnectMutex.notifyAll();
connectFailures = 0; connectFailures = 0;
LOG.info("Successfully reconnected to backup " + uri); LOG.info("Successfully reconnected to backup " + uri);
@ -646,7 +651,7 @@ public class FailoverTransport implements CompositeTransport {
} }
Iterator<URI> iter = connectList.iterator(); Iterator<URI> iter = connectList.iterator();
while(iter.hasNext() && connectedTransport == null && !disposed) { while(iter.hasNext() && connectedTransport.get() == null && !disposed) {
URI uri = iter.next(); URI uri = iter.next();
try { try {
LOG.debug("Attempting connect to: " + uri); LOG.debug("Attempting connect to: " + uri);
@ -661,7 +666,7 @@ public class FailoverTransport implements CompositeTransport {
LOG.debug("Connection established"); LOG.debug("Connection established");
reconnectDelay = initialReconnectDelay; reconnectDelay = initialReconnectDelay;
connectedTransportURI = uri; connectedTransportURI = uri;
connectedTransport = t; connectedTransport.set(t);
reconnectMutex.notifyAll(); reconnectMutex.notifyAll();
connectFailures = 0; connectFailures = 0;
if (transportListener != null) { if (transportListener != null) {