diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 553036ba85..c0bc9da359 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -111,9 +111,7 @@ public class FailoverTransport implements CompositeTransport { private boolean trackMessages = false; private boolean trackTransactionProducers = true; private int maxCacheSize = 128 * 1024; - private final TransportListener disposedListener = new DefaultTransportListener() { - }; - private final TransportListener myTransportListener = createTransportListener(); + private final TransportListener disposedListener = new DefaultTransportListener() {}; private boolean updateURIsSupported = true; private boolean reconnectSupported = true; // remember for reconnect thread @@ -180,61 +178,61 @@ public class FailoverTransport implements CompositeTransport { }, "ActiveMQ Failover Worker: " + System.identityHashCode(this)); } - TransportListener createTransportListener() { + private void processCommand(Object incoming) { + Command command = (Command) incoming; + if (command == null) { + return; + } + if (command.isResponse()) { + Object object = null; + synchronized (requestMap) { + object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId())); + } + if (object != null && object.getClass() == Tracked.class) { + ((Tracked) object).onResponses(command); + } + } + + if (command.isConnectionControl()) { + handleConnectionControl((ConnectionControl) command); + } else if (command.isConsumerControl()) { + ConsumerControl consumerControl = (ConsumerControl)command; + if (consumerControl.isClose()) { + stateTracker.processRemoveConsumer(consumerControl.getConsumerId(), RemoveInfo.LAST_DELIVERED_UNKNOWN); + } + } + + if (transportListener != null) { + transportListener.onCommand(command); + } + } + + private TransportListener createTransportListener(final Transport owner) { return new TransportListener() { + @Override public void onCommand(Object o) { - Command command = (Command) o; - if (command == null) { - return; - } - if (command.isResponse()) { - Object object = null; - synchronized (requestMap) { - object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId())); - } - if (object != null && object.getClass() == Tracked.class) { - ((Tracked) object).onResponses(command); - } - } - - if (command.isConnectionControl()) { - handleConnectionControl((ConnectionControl) command); - } - else if (command.isConsumerControl()) { - ConsumerControl consumerControl = (ConsumerControl)command; - if (consumerControl.isClose()) { - stateTracker.processRemoveConsumer(consumerControl.getConsumerId(), RemoveInfo.LAST_DELIVERED_UNKNOWN); - } - - } - if (transportListener != null) { - transportListener.onCommand(command); - } + processCommand(o); } @Override public void onException(IOException error) { try { - handleTransportFailure(error); + handleTransportFailure(owner, error); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - transportListener.onException(new InterruptedIOException()); + if (transportListener != null) { + transportListener.onException(new InterruptedIOException()); + } } } @Override public void transportInterupted() { - if (transportListener != null) { - transportListener.transportInterupted(); - } } @Override public void transportResumed() { - if (transportListener != null) { - transportListener.transportResumed(); - } } }; } @@ -245,6 +243,10 @@ public class FailoverTransport implements CompositeTransport { } public final void handleTransportFailure(IOException e) throws InterruptedException { + handleTransportFailure(getConnectedTransport(), e); + } + + public final void handleTransportFailure(Transport failed, IOException e) throws InterruptedException { if (shuttingDown) { // shutdown info sent and remote socket closed and we see that before a local close // let the close do the work @@ -256,21 +258,25 @@ public class FailoverTransport implements CompositeTransport { } // could be blocked in write with the reconnectMutex held, but still needs to be whacked - Transport transport = connectedTransport.getAndSet(null); - if (transport != null) { - disposeTransport(transport); + Transport transport = null; + + if (connectedTransport.compareAndSet(failed, null)) { + transport = failed; + if (transport != null) { + disposeTransport(transport); + } } synchronized (reconnectMutex) { if (transport != null && connectedTransport.get() == null) { - boolean reconnectOk = false; if (canReconnect()) { reconnectOk = true; } - LOG.warn("Transport (" + connectedTransportURI + ") failed" - + (reconnectOk ? "," : ", not") + " attempting to automatically reconnect", e); + + LOG.warn("Transport ({}) failed {} attempting to automatically reconnect: {}", + connectedTransport, (reconnectOk ? "," : ", not"), e); failedConnectTransportURI = connectedTransportURI; connectedTransportURI = null; @@ -347,9 +353,7 @@ public class FailoverTransport implements CompositeTransport { @Override public void start() throws Exception { synchronized (reconnectMutex) { - if (LOG.isDebugEnabled()) { - LOG.debug("Started " + this); - } + LOG.debug("Started {}", this); if (started) { return; } @@ -373,7 +377,7 @@ public class FailoverTransport implements CompositeTransport { try { synchronized (reconnectMutex) { if (LOG.isDebugEnabled()) { - LOG.debug("Stopped " + this); + LOG.debug("Stopped {}", this); } if (!started) { return; @@ -407,9 +411,7 @@ public class FailoverTransport implements CompositeTransport { } for (Transport transport : backupsToStop) { try { - if (LOG.isTraceEnabled()) { - LOG.trace("Stopped backup: " + transport); - } + LOG.trace("Stopped backup: {}", transport); disposeTransport(transport); } catch (Exception e) { } @@ -581,7 +583,7 @@ public class FailoverTransport implements CompositeTransport { if (command.isResponseRequired()) { Response response = new Response(); response.setCorrelationId(command.getCommandId()); - myTransportListener.onCommand(response); + processCommand(response); } return; } else if (command instanceof MessagePull) { @@ -591,7 +593,7 @@ public class FailoverTransport implements CompositeTransport { MessageDispatch dispatch = new MessageDispatch(); dispatch.setConsumerId(pullRequest.getConsumerId()); dispatch.setDestination(pullRequest.getDestination()); - myTransportListener.onCommand(dispatch); + processCommand(dispatch); } return; } @@ -607,24 +609,19 @@ public class FailoverTransport implements CompositeTransport { boolean timedout = false; while (transport == null && !disposed && connectionFailure == null && !Thread.currentThread().isInterrupted() && willReconnect()) { - if (LOG.isTraceEnabled()) { - LOG.trace("Waiting for transport to reconnect..: " + command); - } + + LOG.trace("Waiting for transport to reconnect..: {}", command); long end = System.currentTimeMillis(); if (command.isMessage() && timeout > 0 && (end - start > timeout)) { timedout = true; - if (LOG.isInfoEnabled()) { - LOG.info("Failover timed out after " + (end - start) + "ms"); - } + LOG.info("Failover timed out after {} ms", (end - start)); break; } try { reconnectMutex.wait(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - if (LOG.isDebugEnabled()) { - LOG.debug("Interupted: " + e, e); - } + LOG.debug("Interupted:", e); } transport = connectedTransport.get(); } @@ -650,7 +647,7 @@ public class FailoverTransport implements CompositeTransport { try { tracked = stateTracker.track(command); } catch (IOException ioe) { - LOG.debug("Cannot track the command " + command, ioe); + LOG.debug("Cannot track the command {} {}", command, ioe); } // If it was a request and it was not being tracked by // the state tracker, @@ -691,19 +688,14 @@ public class FailoverTransport implements CompositeTransport { } else { // Handle the error but allow the method to return since the // tracked commands are replayed on reconnect. - if (LOG.isDebugEnabled()) { - LOG.debug("Send oneway attempt: " + i + " failed for command:" + command); - } + LOG.debug("Send oneway attempt: {} failed for command: {}", i, command); handleTransportFailure(e); } } return; - } catch (IOException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Send oneway attempt: " + i + " failed for command:" + command); - } + LOG.debug("Send oneway attempt: {} failed for command: {}", i, command); handleTransportFailure(e); } } @@ -774,7 +766,7 @@ public class FailoverTransport implements CompositeTransport { } } catch (Exception e) { - LOG.error("Failed to parse URI: " + u); + LOG.error("Failed to parse URI: {}", u); } } @@ -818,9 +810,9 @@ public class FailoverTransport implements CompositeTransport { if (removed) { l.add(failedConnectTransportURI); } - if (LOG.isDebugEnabled()) { - LOG.debug("urlList connectionList:" + l + ", from: " + uris); - } + + LOG.debug("urlList connectionList:{}, from: {}", l, uris); + return l; } @@ -861,9 +853,7 @@ public class FailoverTransport implements CompositeTransport { tmpMap = new LinkedHashMap(requestMap); } for (Command command : tmpMap.values()) { - if (LOG.isTraceEnabled()) { - LOG.trace("restore requestMap, replay: " + command); - } + LOG.trace("restore requestMap, replay: {}", command); t.oneway(command); } } @@ -916,7 +906,7 @@ public class FailoverTransport implements CompositeTransport { } newUris = buffer.toString(); } catch (IOException ioe) { - LOG.error("Failed to read updateURIsURL: " + fileURL, ioe); + LOG.error("Failed to read updateURIsURL: {} {}",fileURL, ioe); } finally { if (in != null) { try { @@ -954,9 +944,7 @@ public class FailoverTransport implements CompositeTransport { doRebalance = false; return false; } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList); - } + LOG.debug("Doing rebalance from: {} to {}", connectedTransportURI, connectList); try { Transport transport = this.connectedTransport.getAndSet(null); @@ -964,9 +952,7 @@ public class FailoverTransport implements CompositeTransport { disposeTransport(transport); } } catch (Exception e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Caught an exception stopping existing transport for rebalance", e); - } + LOG.debug("Caught an exception stopping existing transport for rebalance", e); } } doRebalance = false; @@ -988,7 +974,7 @@ public class FailoverTransport implements CompositeTransport { backups.remove(bt); transport = bt.getTransport(); uri = bt.getUri(); - myTransportListener.onCommand(bt.getBrokerInfo()); + processCommand(bt.getBrokerInfo()); if (priorityBackup && priorityBackupAvailable) { Transport old = this.connectedTransport.getAndSet(null); if (old != null) { @@ -1023,19 +1009,17 @@ public class FailoverTransport implements CompositeTransport { transport = TransportFactory.compositeConnect(uri); } - if (LOG.isDebugEnabled()) { - LOG.debug("Attempting " + connectFailures + "th connect to: " + uri); - } - transport.setTransportListener(myTransportListener); + LOG.debug("Attempting {}th connect to: {}", connectFailures, uri); + + transport.setTransportListener(createTransportListener(transport)); transport.start(); - if (started && !firstConnection) { + if (started && !firstConnection) { restoreTransport(transport); } - if (LOG.isDebugEnabled()) { - LOG.debug("Connection established"); - } + LOG.debug("Connection established"); + reconnectDelay = initialReconnectDelay; connectedTransportURI = uri; connectedTransport.set(transport); @@ -1058,33 +1042,26 @@ public class FailoverTransport implements CompositeTransport { if (transportListener != null) { transportListener.transportResumed(); } else { - if (LOG.isDebugEnabled()) { - LOG.debug("transport resumed by transport listener not set"); - } + LOG.debug("transport resumed by transport listener not set"); } if (firstConnection) { firstConnection = false; - LOG.info("Successfully connected to " + uri); + LOG.info("Successfully connected to {}", uri); } else { - LOG.info("Successfully reconnected to " + uri); + LOG.info("Successfully reconnected to {}", uri); } return false; } catch (Exception e) { failure = e; - if (LOG.isDebugEnabled()) { - LOG.debug("Connect fail to: " + uri + ", reason: " + e); - } + LOG.debug("Connect fail to: {}, reason: {}", uri, e); if (transport != null) { try { transport.stop(); transport = null; } catch (Exception ee) { - if (LOG.isDebugEnabled()) { - LOG.debug("Stop of failed transport: " + transport + - " failed with reason: " + ee); - } + LOG.debug("Stop of failed transport: {} failed with reason: {}", transport, ee); } } } finally { @@ -1098,7 +1075,7 @@ public class FailoverTransport implements CompositeTransport { connectFailures++; if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) { - LOG.error("Failed to connect to " + uris + " after: " + connectFailures + " attempt(s)"); + LOG.error("Failed to connect to {} after: {} attempt(s)", uris, connectFailures); connectionFailure = failure; // Make sure on initial startup, that the transportListener has been @@ -1406,9 +1383,9 @@ public class FailoverTransport implements CompositeTransport { } catch(IOException e) { if (firstAddr == null) { - LOG.error("Failed to Lookup INetAddress for URI[ " + first + " ] : " + e); + LOG.error("Failed to Lookup INetAddress for URI[{}] : {}", first, e); } else { - LOG.error("Failed to Lookup INetAddress for URI[ " + second + " ] : " + e); + LOG.error("Failed to Lookup INetAddress for URI[{}] : {}", second, e); } if (first.getHost().equalsIgnoreCase(second.getHost())) {