Prevent conccurent calls to handleTransportFailure from closing an
already reconnected transport instance.
This commit is contained in:
Timothy Bish 2016-04-18 09:45:45 -04:00
parent 11622b3af3
commit 23a5beb86c
1 changed files with 86 additions and 109 deletions

View File

@ -111,9 +111,7 @@ public class FailoverTransport implements CompositeTransport {
private boolean trackMessages = false;
private boolean trackTransactionProducers = true;
private int maxCacheSize = 128 * 1024;
private final TransportListener disposedListener = new DefaultTransportListener() {
};
private final TransportListener myTransportListener = createTransportListener();
private final TransportListener disposedListener = new DefaultTransportListener() {};
private boolean updateURIsSupported = true;
private boolean reconnectSupported = true;
// remember for reconnect thread
@ -180,11 +178,8 @@ public class FailoverTransport implements CompositeTransport {
}, "ActiveMQ Failover Worker: " + System.identityHashCode(this));
}
TransportListener createTransportListener() {
return new TransportListener() {
@Override
public void onCommand(Object o) {
Command command = (Command) o;
private void processCommand(Object incoming) {
Command command = (Command) incoming;
if (command == null) {
return;
}
@ -200,41 +195,44 @@ public class FailoverTransport implements CompositeTransport {
if (command.isConnectionControl()) {
handleConnectionControl((ConnectionControl) command);
}
else if (command.isConsumerControl()) {
} else if (command.isConsumerControl()) {
ConsumerControl consumerControl = (ConsumerControl)command;
if (consumerControl.isClose()) {
stateTracker.processRemoveConsumer(consumerControl.getConsumerId(), RemoveInfo.LAST_DELIVERED_UNKNOWN);
}
}
if (transportListener != null) {
transportListener.onCommand(command);
}
}
private TransportListener createTransportListener(final Transport owner) {
return new TransportListener() {
@Override
public void onCommand(Object o) {
processCommand(o);
}
@Override
public void onException(IOException error) {
try {
handleTransportFailure(error);
handleTransportFailure(owner, error);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (transportListener != null) {
transportListener.onException(new InterruptedIOException());
}
}
}
@Override
public void transportInterupted() {
if (transportListener != null) {
transportListener.transportInterupted();
}
}
@Override
public void transportResumed() {
if (transportListener != null) {
transportListener.transportResumed();
}
}
};
}
@ -245,6 +243,10 @@ public class FailoverTransport implements CompositeTransport {
}
public final void handleTransportFailure(IOException e) throws InterruptedException {
handleTransportFailure(getConnectedTransport(), e);
}
public final void handleTransportFailure(Transport failed, IOException e) throws InterruptedException {
if (shuttingDown) {
// shutdown info sent and remote socket closed and we see that before a local close
// let the close do the work
@ -256,21 +258,25 @@ public class FailoverTransport implements CompositeTransport {
}
// could be blocked in write with the reconnectMutex held, but still needs to be whacked
Transport transport = connectedTransport.getAndSet(null);
Transport transport = null;
if (connectedTransport.compareAndSet(failed, null)) {
transport = failed;
if (transport != null) {
disposeTransport(transport);
}
}
synchronized (reconnectMutex) {
if (transport != null && connectedTransport.get() == null) {
boolean reconnectOk = false;
if (canReconnect()) {
reconnectOk = true;
}
LOG.warn("Transport (" + connectedTransportURI + ") failed"
+ (reconnectOk ? "," : ", not") + " attempting to automatically reconnect", e);
LOG.warn("Transport ({}) failed {} attempting to automatically reconnect: {}",
connectedTransport, (reconnectOk ? "," : ", not"), e);
failedConnectTransportURI = connectedTransportURI;
connectedTransportURI = null;
@ -347,9 +353,7 @@ public class FailoverTransport implements CompositeTransport {
@Override
public void start() throws Exception {
synchronized (reconnectMutex) {
if (LOG.isDebugEnabled()) {
LOG.debug("Started " + this);
}
LOG.debug("Started {}", this);
if (started) {
return;
}
@ -373,7 +377,7 @@ public class FailoverTransport implements CompositeTransport {
try {
synchronized (reconnectMutex) {
if (LOG.isDebugEnabled()) {
LOG.debug("Stopped " + this);
LOG.debug("Stopped {}", this);
}
if (!started) {
return;
@ -407,9 +411,7 @@ public class FailoverTransport implements CompositeTransport {
}
for (Transport transport : backupsToStop) {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("Stopped backup: " + transport);
}
LOG.trace("Stopped backup: {}", transport);
disposeTransport(transport);
} catch (Exception e) {
}
@ -581,7 +583,7 @@ public class FailoverTransport implements CompositeTransport {
if (command.isResponseRequired()) {
Response response = new Response();
response.setCorrelationId(command.getCommandId());
myTransportListener.onCommand(response);
processCommand(response);
}
return;
} else if (command instanceof MessagePull) {
@ -591,7 +593,7 @@ public class FailoverTransport implements CompositeTransport {
MessageDispatch dispatch = new MessageDispatch();
dispatch.setConsumerId(pullRequest.getConsumerId());
dispatch.setDestination(pullRequest.getDestination());
myTransportListener.onCommand(dispatch);
processCommand(dispatch);
}
return;
}
@ -607,24 +609,19 @@ public class FailoverTransport implements CompositeTransport {
boolean timedout = false;
while (transport == null && !disposed && connectionFailure == null
&& !Thread.currentThread().isInterrupted() && willReconnect()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Waiting for transport to reconnect..: " + command);
}
LOG.trace("Waiting for transport to reconnect..: {}", command);
long end = System.currentTimeMillis();
if (command.isMessage() && timeout > 0 && (end - start > timeout)) {
timedout = true;
if (LOG.isInfoEnabled()) {
LOG.info("Failover timed out after " + (end - start) + "ms");
}
LOG.info("Failover timed out after {} ms", (end - start));
break;
}
try {
reconnectMutex.wait(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (LOG.isDebugEnabled()) {
LOG.debug("Interupted: " + e, e);
}
LOG.debug("Interupted:", e);
}
transport = connectedTransport.get();
}
@ -650,7 +647,7 @@ public class FailoverTransport implements CompositeTransport {
try {
tracked = stateTracker.track(command);
} catch (IOException ioe) {
LOG.debug("Cannot track the command " + command, ioe);
LOG.debug("Cannot track the command {} {}", command, ioe);
}
// If it was a request and it was not being tracked by
// the state tracker,
@ -691,19 +688,14 @@ public class FailoverTransport implements CompositeTransport {
} else {
// Handle the error but allow the method to return since the
// tracked commands are replayed on reconnect.
if (LOG.isDebugEnabled()) {
LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
}
LOG.debug("Send oneway attempt: {} failed for command: {}", i, command);
handleTransportFailure(e);
}
}
return;
} catch (IOException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
}
LOG.debug("Send oneway attempt: {} failed for command: {}", i, command);
handleTransportFailure(e);
}
}
@ -774,7 +766,7 @@ public class FailoverTransport implements CompositeTransport {
}
} catch (Exception e) {
LOG.error("Failed to parse URI: " + u);
LOG.error("Failed to parse URI: {}", u);
}
}
@ -818,9 +810,9 @@ public class FailoverTransport implements CompositeTransport {
if (removed) {
l.add(failedConnectTransportURI);
}
if (LOG.isDebugEnabled()) {
LOG.debug("urlList connectionList:" + l + ", from: " + uris);
}
LOG.debug("urlList connectionList:{}, from: {}", l, uris);
return l;
}
@ -861,9 +853,7 @@ public class FailoverTransport implements CompositeTransport {
tmpMap = new LinkedHashMap<Integer, Command>(requestMap);
}
for (Command command : tmpMap.values()) {
if (LOG.isTraceEnabled()) {
LOG.trace("restore requestMap, replay: " + command);
}
LOG.trace("restore requestMap, replay: {}", command);
t.oneway(command);
}
}
@ -916,7 +906,7 @@ public class FailoverTransport implements CompositeTransport {
}
newUris = buffer.toString();
} catch (IOException ioe) {
LOG.error("Failed to read updateURIsURL: " + fileURL, ioe);
LOG.error("Failed to read updateURIsURL: {} {}",fileURL, ioe);
} finally {
if (in != null) {
try {
@ -954,9 +944,7 @@ public class FailoverTransport implements CompositeTransport {
doRebalance = false;
return false;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList);
}
LOG.debug("Doing rebalance from: {} to {}", connectedTransportURI, connectList);
try {
Transport transport = this.connectedTransport.getAndSet(null);
@ -964,11 +952,9 @@ public class FailoverTransport implements CompositeTransport {
disposeTransport(transport);
}
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Caught an exception stopping existing transport for rebalance", e);
}
}
}
doRebalance = false;
}
@ -988,7 +974,7 @@ public class FailoverTransport implements CompositeTransport {
backups.remove(bt);
transport = bt.getTransport();
uri = bt.getUri();
myTransportListener.onCommand(bt.getBrokerInfo());
processCommand(bt.getBrokerInfo());
if (priorityBackup && priorityBackupAvailable) {
Transport old = this.connectedTransport.getAndSet(null);
if (old != null) {
@ -1023,19 +1009,17 @@ public class FailoverTransport implements CompositeTransport {
transport = TransportFactory.compositeConnect(uri);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Attempting " + connectFailures + "th connect to: " + uri);
}
transport.setTransportListener(myTransportListener);
LOG.debug("Attempting {}th connect to: {}", connectFailures, uri);
transport.setTransportListener(createTransportListener(transport));
transport.start();
if (started && !firstConnection) {
restoreTransport(transport);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Connection established");
}
reconnectDelay = initialReconnectDelay;
connectedTransportURI = uri;
connectedTransport.set(transport);
@ -1058,33 +1042,26 @@ public class FailoverTransport implements CompositeTransport {
if (transportListener != null) {
transportListener.transportResumed();
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("transport resumed by transport listener not set");
}
}
if (firstConnection) {
firstConnection = false;
LOG.info("Successfully connected to " + uri);
LOG.info("Successfully connected to {}", uri);
} else {
LOG.info("Successfully reconnected to " + uri);
LOG.info("Successfully reconnected to {}", uri);
}
return false;
} catch (Exception e) {
failure = e;
if (LOG.isDebugEnabled()) {
LOG.debug("Connect fail to: " + uri + ", reason: " + e);
}
LOG.debug("Connect fail to: {}, reason: {}", uri, e);
if (transport != null) {
try {
transport.stop();
transport = null;
} catch (Exception ee) {
if (LOG.isDebugEnabled()) {
LOG.debug("Stop of failed transport: " + transport +
" failed with reason: " + ee);
}
LOG.debug("Stop of failed transport: {} failed with reason: {}", transport, ee);
}
}
} finally {
@ -1098,7 +1075,7 @@ public class FailoverTransport implements CompositeTransport {
connectFailures++;
if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) {
LOG.error("Failed to connect to " + uris + " after: " + connectFailures + " attempt(s)");
LOG.error("Failed to connect to {} after: {} attempt(s)", uris, connectFailures);
connectionFailure = failure;
// Make sure on initial startup, that the transportListener has been
@ -1406,9 +1383,9 @@ public class FailoverTransport implements CompositeTransport {
} catch(IOException e) {
if (firstAddr == null) {
LOG.error("Failed to Lookup INetAddress for URI[ " + first + " ] : " + e);
LOG.error("Failed to Lookup INetAddress for URI[{}] : {}", first, e);
} else {
LOG.error("Failed to Lookup INetAddress for URI[ " + second + " ] : " + e);
LOG.error("Failed to Lookup INetAddress for URI[{}] : {}", second, e);
}
if (first.getHost().equalsIgnoreCase(second.getHost())) {